Skip to main content

git_remote_object_store/protocol/
push.rs

1//! `push` handler with per-ref locking via conditional writes.
2//!
3//! Sequential-batch semantics: every `push <refspec>` line in a batch
4//! is processed in order under its own per-ref lock, and one outcome
5//! line is emitted per push (`ok <ref>\n` or `error <ref> "msg"\n`).
6//! `gix::Repository` is `!Sync` so the handler holds the repo handle
7//! on a single task — pushes never run in parallel within one client.
8//!
9//! Stdout discipline: this module returns [`PushOutcome`] values and
10//! never writes to the protocol stream itself. The REPL renders each
11//! outcome and the trailing blank-line terminator (see
12//! `.claude/rules/protocol-stdout.md`).
13
14use std::collections::HashSet;
15use std::env;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19
20use bytes::Bytes;
21use time::{Duration, OffsetDateTime};
22use tokio::sync::watch;
23use tokio::task::JoinHandle;
24use tracing::{debug, info, warn};
25
26use crate::git::{self, GitError, RefName, RefNameError, Sha, ShaError, is_valid_ref_name};
27use crate::keys;
28use crate::object_store::{ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts};
29use crate::packchain::gc::{tombstoned_bundle_keys, write_baseline_tombstone_best_effort};
30use crate::packchain::schema::Sha40;
31use crate::url::{BackendKind, StorageEngine};
32
33/// Default per-ref lock TTL (seconds) when [`ENV_LOCK_TTL_SECONDS`] is
34/// unset or unparseable. Single source of truth for the lock TTL —
35/// `manage::doctor`'s stale-lock predicate, the CLI's
36/// `--lock-ttl-seconds` default (resolved through [`lock_ttl_from_env`]
37/// when the flag is unset), and integration-test wire-format pinning
38/// all consume this constant (the management surface re-exports it
39/// from [`crate::manage::DEFAULT_LOCK_TTL_SECONDS`]) so the views of
40/// "stale" cannot drift silently.
41pub const DEFAULT_LOCK_TTL_SECONDS: u64 = 60;
42
43/// Push configuration that is constant across an entire batch.
44///
45/// Bundles the `zip`, `engine`, and `ttl` parameters so that [`push_one`]
46/// and [`perform_push_under_lock`] stay within the argument-count budget.
47struct PushConfig {
48    /// Whether to upload `repo.zip` alongside each bundle.
49    zip: bool,
50    /// Storage engine to lock into the `FORMAT` key on the first push.
51    engine: StorageEngine,
52    /// Per-ref lock TTL.
53    ttl: Duration,
54    /// Backend kind of the destination. Used by the zip-artifact path to
55    /// decide whether to attach the `codepipeline-artifact-revision-summary`
56    /// user-metadata header — that header is meaningful only on S3
57    /// (AWS `CodePipeline` consumes it) and the hyphenated key is invalid on
58    /// Azure, where it would otherwise cause the entire zip upload to fail
59    /// silently under the issue #127 best-effort swallow contract.
60    /// See issue #161.
61    kind: BackendKind,
62}
63
64/// Environment override for the lock TTL, in seconds.
65pub(crate) const ENV_LOCK_TTL_SECONDS: &str = "GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS";
66
67/// Stable substring embedded in the rejection message returned when the
68/// remote ref is not an ancestor of the pushed local ref. Treated as a
69/// user-facing contract: shellspec suites assert on this token to
70/// distinguish the ancestor-mismatch failure mode from unrelated push
71/// failures (network, permission, malformed URL, ...). Reword the
72/// surrounding sentence freely; do not change this token without
73/// updating every call site, including the spec files under
74/// `spec/integration/*/force_push_spec.sh` and
75/// `spec/live/*/force_push_spec.sh`.
76pub(crate) const NOT_ANCESTOR_TOKEN: &str = "not ancestor";
77
78/// Build the canonical wire-format rejection message returned when a
79/// push is refused because the remote ref is not an ancestor of
80/// `local_spec`. Centralising the template here keeps the bundle and
81/// packchain engines in lockstep — they previously inlined byte-identical
82/// `format!` calls, and silent drift between them would have produced
83/// engine-dependent wire output the spec suites could not match against
84/// [`NOT_ANCESTOR_TOKEN`].
85pub(crate) fn not_ancestor_wire_message(local_spec: &str) -> String {
86    format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of {local_spec}."?"#)
87}
88
89/// Canonical wire-format message returned when a delete is refused
90/// because a `PROTECTED#` marker is present under the ref. Shared by
91/// the bundle engine ([`delete_remote_ref_under_lock`]) and the
92/// packchain engine ([`crate::packchain`]'s `delete_remote_ref_packchain`)
93/// so both surface identical bytes to git/clients. A single source of
94/// truth here avoids the duplicate-literal drift that a `const _` byte
95/// equality guard previously had to defend against.
96pub(crate) const DELETE_PROTECTION_MESSAGE: &str = r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#;
97
98/// Errors surfaced by the push path. These abort the helper — per-ref
99/// failures (multi-bundle, ancestor mismatch, lock contention, ...) are
100/// returned as [`PushOutcome::Error`] without aborting the batch.
101#[derive(Debug, thiserror::Error)]
102pub enum PushError {
103    /// `push <refspec>` line could not be parsed.
104    #[error("invalid push command {line:?}: expected `[+]<src>:<dst>`")]
105    Parse {
106        /// The offending line payload (after the `push ` prefix).
107        line: String,
108    },
109
110    /// Local rev-spec failed permissive ref-name validation.
111    #[error("invalid local ref-spec: {0:?}")]
112    InvalidLocalSpec(String),
113
114    /// Remote ref name is malformed.
115    #[error("invalid remote ref: {0}")]
116    RemoteRef(#[from] RefNameError),
117
118    /// SHA hex extracted from a stored bundle key was malformed.
119    #[error("invalid SHA in bundle key: {0}")]
120    Sha(#[from] ShaError),
121
122    /// Object-store transport / auth failure.
123    #[error("object-store error during push: {0}")]
124    Store(#[from] ObjectStoreError),
125
126    /// Local git operation failed (rev-parse, bundle, archive).
127    #[error("git error during push: {0}")]
128    Git(#[from] GitError),
129
130    /// Local I/O failure (tempdir, file read).
131    #[error("local I/O error during push: {0}")]
132    Io(#[from] std::io::Error),
133
134    /// Packchain-engine-specific failure surfaced by
135    /// [`crate::packchain::push::push_batch`]. Wrapped here so the
136    /// per-ref `error <ref>` arm in [`push_batch`] can render a
137    /// uniform wire line regardless of which engine produced the
138    /// error.
139    #[error("packchain engine error during push: {0}")]
140    Packchain(#[from] crate::packchain::PackchainError),
141}
142
143/// Result of a single push within a batch. Rendered to stdout by the REPL
144/// as either `ok <ref>\n` or `error <ref> <msg>\n`.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub enum PushOutcome {
147    /// Push succeeded. `remote_ref` echoes back to git so it can mark
148    /// the local ref as updated.
149    Ok {
150        /// The remote ref that was pushed (unparsed wire form).
151        remote_ref: String,
152    },
153    /// Push was rejected. `message` is the free-form reason rendered
154    /// after the ref name on the wire.
155    Error {
156        /// The remote ref the rejection applies to.
157        remote_ref: String,
158        /// Human-readable rejection reason.
159        message: String,
160    },
161}
162
163impl PushOutcome {
164    /// Format `self` as the single line emitted on stdout (terminator
165    /// included).
166    #[must_use]
167    pub(crate) fn to_protocol_line(&self) -> String {
168        match self {
169            Self::Ok { remote_ref } => format!("ok {remote_ref}\n"),
170            Self::Error {
171                remote_ref,
172                message,
173            } => format!("error {remote_ref} {message}\n"),
174        }
175    }
176}
177
178/// Parsed `push` command line.
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub(crate) struct PushSpec {
181    /// `+` was present — the user requested a force push.
182    pub(crate) force: bool,
183    /// User-supplied local rev-spec. Empty means "delete the remote ref".
184    pub(crate) local_spec: String,
185    /// Strict, fully-qualified remote ref.
186    pub(crate) remote_ref: RefName,
187}
188
189/// Parse the payload of a `push <refspec>` line (the bytes after the
190/// `push ` prefix have already been stripped by the REPL).
191pub(crate) fn parse_push_args(args: &str) -> Result<PushSpec, PushError> {
192    let parse_err = || PushError::Parse {
193        line: args.to_owned(),
194    };
195    if args.is_empty() || args.contains(' ') {
196        return Err(parse_err());
197    }
198    let (local, remote) = args.split_once(':').ok_or_else(parse_err)?;
199    if remote.is_empty() {
200        return Err(parse_err());
201    }
202    let (force, local) = match local.strip_prefix('+') {
203        Some(rest) => (true, rest),
204        None => (false, local),
205    };
206    if !local.is_empty() && !is_valid_ref_name(local) {
207        return Err(PushError::InvalidLocalSpec(local.to_owned()));
208    }
209    let remote_ref = RefName::new(remote)?;
210    Ok(PushSpec {
211        force,
212        local_spec: local.to_owned(),
213        remote_ref,
214    })
215}
216
217/// Build the `<prefix>/<ref>/` listing prefix used by lock and bundle
218/// listings. Empty / absent prefix collapses to a bare `<ref>/`.
219///
220/// Thin typed wrapper over [`keys::ref_listing_prefix`] that takes the
221/// validated `RefName` newtype so helper-protocol call sites can't pass
222/// an unvalidated string. The shared helper is the single allocation
223/// point — `manage::doctor` and `manage::branch` reach it directly with
224/// already-validated `&str` ref paths.
225pub(crate) fn ref_listing_prefix(prefix: Option<&str>, remote_ref: &RefName) -> String {
226    keys::ref_listing_prefix(prefix, remote_ref.as_str())
227}
228
229/// Build the lock key: `<prefix>/<ref>/LOCK#.lock`.
230pub(crate) fn lock_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
231    format!("{}LOCK#.lock", ref_listing_prefix(prefix, remote_ref))
232}
233
234/// Build the zip-archive key: `<prefix>/<ref>/repo.zip`.
235fn archive_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
236    format!("{}repo.zip", ref_listing_prefix(prefix, remote_ref))
237}
238
239/// Build the HEAD key: `<prefix>/HEAD` (no slash when prefix is absent).
240pub(crate) fn head_key(prefix: Option<&str>) -> String {
241    keys::join(prefix, "HEAD")
242}
243
244/// Bundle-candidate filter: positive predicate — the final path segment
245/// must be `<sha>.bundle` (40 lower-hex chars + `.bundle`). All other
246/// per-ref siblings (`repo.zip`, `LOCK#.lock`, `PROTECTED#`, any
247/// hypothetical future artefact) are rejected by construction.
248///
249/// Earlier revisions filtered by full-key substring (`.zip`, `/LOCKS/`,
250/// `.lock`), which false-positively dropped real bundle keys whose ref
251/// name happened to contain those substrings — e.g.
252/// `refs/heads/v1.zip-rc1/<sha>.bundle` or
253/// `refs/heads/LOCKS-feature/x/<sha>.bundle`. A positive check on the
254/// final segment cannot misfire on ref-name content.
255fn is_bundle_candidate(key: &str) -> bool {
256    parse_remote_sha_from_key(key).is_some()
257}
258
259/// Returns every bundle object currently stored under `remote_ref`,
260/// filtered by [`is_bundle_candidate`] and excluding bundles named by
261/// any `<prefix>/gc/baseline-tomb-*.json` (issue #157). The store's
262/// listing prefix is `<prefix>/<ref>/` so sibling-ref keys don't leak
263/// in.
264///
265/// Tombstoned bundles are filtered here because the bundle engine
266/// uses the bundle-key listing as its source of truth for "ref →
267/// current SHA". A force-push that deferred the prior bundle leaves
268/// two `<sha>.bundle` keys under the ref; without the tombstone
269/// filter the next push's under-lock multi-bundle guard would refuse
270/// it. The tombstoned bundle stays readable at its original key for
271/// in-flight fetchers — only the listing path hides it.
272///
273/// Issue #165: `cached_hidden` lets the caller supply a tombstone set
274/// computed earlier in the same push (e.g. by [`prepare_push`]) and
275/// reused under the per-ref lock by [`perform_push_under_lock`]. The
276/// cached set is sound because all tombstone writers for a given ref
277/// (push's `defer_prior_bundle_via_tombstone`, compact's
278/// `tombstone_prior_baseline_bundle`, delete-branch's orphan path)
279/// run under the same per-ref lock — so no new tombstone for this
280/// ref can land between the pre-lock and under-lock calls within one
281/// `push_one` invocation. `None` keeps the original "fetch on demand"
282/// shape used by call sites that don't have a cache to share.
283async fn bundles_for_ref(
284    store: &dyn ObjectStore,
285    prefix: Option<&str>,
286    remote_ref: &RefName,
287    cached_hidden: Option<&HashSet<String>>,
288) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
289    let listing = ref_listing_prefix(prefix, remote_ref);
290    let metas = store.list(&listing).await?;
291    let bundles: Vec<ObjectMeta> = metas
292        .into_iter()
293        .filter(|m| is_bundle_candidate(&m.key))
294        .collect();
295    // Short-circuit the tombstone lookup when there is nothing that
296    // could be filtered. The common case (no force-push in the last
297    // grace window) pays no extra listing cost.
298    if bundles.is_empty() {
299        return Ok(bundles);
300    }
301    // Reuse the caller-supplied set when present; otherwise fetch
302    // on demand. The owned binding outlives the borrow so a single
303    // `&HashSet` covers both paths without an `Option`/`expect` dance.
304    let fetched;
305    let hidden: &HashSet<String> = if let Some(h) = cached_hidden {
306        h
307    } else {
308        fetched = tombstoned_bundle_keys(store, prefix).await?;
309        &fetched
310    };
311    Ok(bundles
312        .into_iter()
313        .filter(|m| !hidden.contains(&m.key))
314        .collect())
315}
316
317/// Returns `true` iff a `<prefix>/<ref>/PROTECTED#` marker exists.
318///
319/// Uses an exact-key `head` rather than a prefix `list`. `ObjectStore::list`
320/// is a byte-prefix match, so a `list("…/PROTECTED#")` would also return any
321/// future `PROTECTED#`-prefixed sibling key (e.g. `PROTECTED#audit`). The
322/// equality check here matches the semantics of the canonical
323/// `is_protected_marker_segment` snapshot-side helper and avoids
324/// spuriously blocking pushes or deletes on unprotected refs.
325pub(crate) async fn is_protected(
326    store: &dyn ObjectStore,
327    prefix: Option<&str>,
328    remote_ref: &RefName,
329) -> Result<bool, ObjectStoreError> {
330    let key = protected_marker_key(prefix, remote_ref);
331    match store.head(&key).await {
332        Ok(_) => Ok(true),
333        Err(ObjectStoreError::NotFound(_)) => Ok(false),
334        Err(e) => Err(e),
335    }
336}
337
338/// Build the `<prefix>/<ref>/PROTECTED#` key. Pulled out so the
339/// pre-delete protection probe ([`is_protected`]) and the post-sweep
340/// integrity verification ([`verify_no_orphan_protected_after_delete`])
341/// share one source of truth for the marker key shape.
342pub(crate) fn protected_marker_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
343    format!(
344        "{}{}",
345        ref_listing_prefix(prefix, remote_ref),
346        keys::PROTECTED_MARKER_SEGMENT,
347    )
348}
349
350/// Issue #151 defence-in-depth: after a delete-path sweep completes,
351/// confirm no `PROTECTED#` marker exists for `remote_ref`. The primary
352/// defence is the per-ref lock (#158, #159): every delete path acquires
353/// `<prefix>/<ref>/LOCK#.lock` before its under-lock listing and the
354/// sweep, and both `protect` and `unprotect` acquire the same key —
355/// so a marker landing between the under-lock listing and the sweep is
356/// mechanically impossible.
357///
358/// This helper is the belt to the lock's suspenders: if the marker is
359/// observed here, the contract was violated (a lock bypass, bucket-level
360/// inconsistency, or a misbehaving sibling tool), and we surface it as
361/// a structured `tracing::error!` so operators can investigate. The
362/// delete is NOT rolled back — the operator-visible "ref is gone"
363/// outcome stands; the orphan marker, if real, will block any future
364/// recreation of the same ref until `unprotect` removes it.
365///
366/// Uses one `head` (not a `list`) — cheap, exact-key, identical shape
367/// to [`is_protected`]. The caller passes the same `prefix` / `remote_ref`
368/// it used to compute the listing prefix so the marker key matches the
369/// sweep's namespace byte-for-byte.
370pub(crate) async fn verify_no_orphan_protected_after_delete(
371    store: &dyn ObjectStore,
372    prefix: Option<&str>,
373    remote_ref: &RefName,
374) {
375    let key = protected_marker_key(prefix, remote_ref);
376    match store.head(&key).await {
377        Ok(_) => {
378            tracing::error!(
379                key = %key,
380                remote_ref = %remote_ref.as_str(),
381                "delete path observed a PROTECTED# marker after sweep; the per-ref lock contract (#158, #159) should make this impossible — investigate for lock bypass or bucket-level inconsistency",
382            );
383        }
384        Err(ObjectStoreError::NotFound(_)) => {}
385        Err(e) => {
386            // The probe itself failed; this is not the integrity
387            // violation we are guarding against. Log at `debug!` so
388            // operators have the trail if a future incident needs it,
389            // but do not promote a transient `head` failure to an
390            // error — the delete already succeeded under the lock.
391            tracing::debug!(
392                key = %key,
393                error = %e,
394                "post-sweep PROTECTED# probe failed; cannot verify orphan-marker invariant for this delete",
395            );
396        }
397    }
398}
399
400/// Extract the SHA from a `<…>/<sha>.bundle` key. Returns `None` if the
401/// trailing segment does not match `[0-9a-f]{40}\.bundle`.
402///
403/// Charset/length validation goes through [`keys::is_valid_bundle_stem`]
404/// so the doctor's malformed-key detector and this parser agree on
405/// what counts as a well-formed stem. The defensive parse-error arm in
406/// [`prepare_push`] still calls this — although push's pre-lock listing
407/// filters malformed stems via [`is_bundle_candidate`] before reaching
408/// the arm, the guard remains in place so a future caller that bypasses
409/// the filter cannot silently treat a malformed key as a bundle.
410fn parse_remote_sha_from_key(key: &str) -> Option<Sha> {
411    let stem = parse_remote_sha_stem_from_key(key)?;
412    Sha::from_hex(stem).ok()
413}
414
415/// Extract the validated 40-lowercase-hex stem from a `<…>/<sha>.bundle`
416/// key without the `&str → Sha → String` round-trip
417/// [`parse_remote_sha_from_key`] incurs. Callers that need the schema
418/// [`Sha40`] (e.g. the tombstone writers) save one hex-encode + one
419/// re-validation by going through the borrowed stem directly.
420fn parse_remote_sha_stem_from_key(key: &str) -> Option<&str> {
421    let last = key.rsplit('/').next()?;
422    let stem = last.strip_suffix(".bundle")?;
423    if !keys::is_valid_bundle_stem(stem) {
424        return None;
425    }
426    Some(stem)
427}
428
429/// Read the lock TTL from `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS`,
430/// falling back to [`DEFAULT_LOCK_TTL_SECONDS`] if the env var is unset,
431/// unparseable, or zero. A zero TTL would make `acquire_lock` treat
432/// every held lock as instantly stale and defeat per-ref locking, so
433/// we mirror [`crate::packchain::gc::grace_hours_from_env`] and clamp
434/// it to the default.
435pub(crate) fn lock_ttl_from_env() -> Duration {
436    // Test-only: the `cfg`-gated block compiles out of release
437    // binaries (the `test-util` feature is dev-only and `cfg(test)`
438    // is set only when the crate itself is under test), so production
439    // callers pay nothing. In test builds, the read lock serialises
440    // against env-mutating tests that hold an `EnvGuard` for this
441    // key; `env_var_read_lock` returns `None` when the current thread
442    // itself holds the writer, so the env-mutating test does not
443    // deadlock on its own write lock.
444    #[cfg(any(test, feature = "test-util"))]
445    let _read = crate::test_util::env_var_read_lock(ENV_LOCK_TTL_SECONDS);
446    let secs = env::var(ENV_LOCK_TTL_SECONDS)
447        .ok()
448        .and_then(|s| s.parse::<u64>().ok())
449        .filter(|s| *s > 0)
450        .unwrap_or(DEFAULT_LOCK_TTL_SECONDS);
451    saturating_duration_seconds(secs)
452}
453
454/// Convert a `u64` seconds count to [`Duration`], saturating at
455/// [`i64::MAX`] (~292-billion-year sentinel TTL ceiling). Centralises
456/// the `i64::try_from(secs).unwrap_or(i64::MAX)` idiom that previously
457/// lived inline in both [`lock_ttl_from_env`] and
458/// [`crate::manage::compact::Compact::run_into`] (#221).
459#[must_use]
460pub(crate) fn saturating_duration_seconds(secs: u64) -> Duration {
461    Duration::seconds(i64::try_from(secs).unwrap_or(i64::MAX))
462}
463
464/// Same as [`lock_ttl_from_env`] but returns the value as `u64` seconds
465/// for callers that want the raw count rather than a `Duration`. The
466/// `expect` surfaces a future regression that loosens `lock_ttl_from_env`'s
467/// non-negative invariant instead of silently masking it.
468pub(crate) fn lock_ttl_from_env_seconds() -> u64 {
469    u64::try_from(lock_ttl_from_env().whole_seconds())
470        .expect("lock_ttl_from_env returns a non-negative seconds count")
471}
472
473/// Resolve a caller-supplied `Option<u64>` lock TTL to a concrete
474/// seconds value, applying the same zero-clamp as [`lock_ttl_from_env`].
475///
476/// Without this helper, `Compact::run_into` would accept a raw
477/// `Option<u64>` and feed `Some(0)` straight into the engine,
478/// bypassing the #112 clamp that protects against `acquire_lock`
479/// treating every held lock as instantly stale. A zero TTL defeats
480/// per-ref locking and corrupts concurrent pushes (issue #208).
481/// `Doctor::resolved_lock_ttl_seconds` deliberately does NOT route
482/// through this helper — doctor only compares lock ages and never
483/// acquires a lock, so an operator-explicit `--lock-ttl-seconds 0`
484/// is honoured as a "treat every lock as stale" request.
485///
486/// Resolution rules:
487///
488/// * `None` — defer to [`lock_ttl_from_env_seconds`] (env override or default).
489/// * `Some(0)` — defer to [`lock_ttl_from_env_seconds`] as well, so
490///   that an accidental zero from a CLI flag or default-constructed
491///   opts still picks up the operator's env override. This matches
492///   the shape of the existing env-side clamp.
493/// * `Some(n)` for `n > 0` — return `n` unchanged.
494///
495/// No upper bound is enforced: the existing env path accepts any
496/// `u64` and downstream `time::Duration::seconds` saturates at
497/// `i64::MAX` (~292 billion years), which is fine for a TTL ceiling.
498pub(crate) fn resolve_lock_ttl_seconds(opt: Option<u64>) -> u64 {
499    opt.filter(|&n| n > 0)
500        .unwrap_or_else(lock_ttl_from_env_seconds)
501}
502
503/// Handle to an acquired per-ref lock with a live heartbeat task.
504///
505/// Returned by [`acquire_lock`]. The guard owns a background tokio task
506/// that periodically re-PUTs the lock key (overwrite, not conditional)
507/// to refresh `last_modified` and keep stale-lock recovery from
508/// stealing a still-live lock from under a long-running critical
509/// section (issue #118).
510///
511/// ## Release
512///
513/// Call [`release_lock`] (or [`Self::release`]) to relinquish: this
514/// signals cooperative shutdown to the heartbeat, awaits its exit so
515/// any in-flight heartbeat PUT completes (or errors) on the server
516/// BEFORE the DELETE is issued, then deletes the lock key. Without
517/// the await-then-delete ordering, an in-flight heartbeat PUT could
518/// settle on the server after the DELETE and resurrect the lock key
519/// as an orphan (issue #150).
520///
521/// ## Drop
522///
523/// If a guard is dropped without an explicit release (panic, early
524/// return without the post-result match, etc.) the heartbeat is told
525/// to stop via the same shutdown signal and the join handle is
526/// aborted (we cannot `.await` in `Drop`). The lock key remains on
527/// the bucket and will be picked up by the next acquire attempt's
528/// stale-recovery path after `ttl` elapses (bounded by
529/// `ttl + heartbeat_interval` since the last heartbeat). Callers
530/// should still prefer explicit release so the lock is freed
531/// immediately.
532#[must_use = "lock guards must be released; dropping leaks the lock until TTL"]
533pub(crate) struct LockGuard {
534    /// Bucket key of the lock. `pub(crate)` so call sites can format
535    /// it into error messages (the old API surfaced `&str`).
536    lock_key: String,
537    /// Store handle, kept around so [`Self::release`] can delete the
538    /// key without callers re-passing it. Also given to the heartbeat
539    /// task via clone.
540    store: Arc<dyn ObjectStore>,
541    /// Cooperative shutdown signal. The heartbeat task watches this
542    /// receiver via [`tokio::select!`]: a `send(true)` wakes the loop
543    /// at any await point inside the `select!` (between ticks) AND is
544    /// checked synchronously before issuing each `put_bytes`, so no
545    /// PUT can be dispatched after [`Self::release`] has begun.
546    shutdown: watch::Sender<bool>,
547    /// Heartbeat task handle. `Some` while the guard owns a live
548    /// heartbeat; taken by release (awaited) or drop (aborted as a
549    /// fallback since `Drop` cannot await).
550    heartbeat: Option<JoinHandle<()>>,
551}
552
553/// Upper bound on how long [`LockGuard::release`] waits for the
554/// heartbeat task to exit cooperatively before falling back to abort.
555///
556/// The heartbeat task wakes from its `select!` immediately on
557/// shutdown and only issues a PUT if it had already passed the
558/// shutdown re-check, so the worst-case wait is one in-flight
559/// `put_bytes` RTT. Five seconds is well above any healthy RTT but
560/// well below any reasonable user-visible push timeout — if the
561/// network is wedged hard enough to outlast it, the fallback abort
562/// keeps `release` from hanging and the orphan key just regresses to
563/// the pre-fix behaviour (stale-lock recovery after TTL).
564const HEARTBEAT_JOIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
565
566impl LockGuard {
567    /// Release the lock: cooperatively stop the heartbeat (awaiting
568    /// the task's exit so any in-flight PUT settles first), then
569    /// delete the lock key. `NotFound` is mapped to `Ok(())` (the
570    /// heartbeat may have raced a stale-recovery delete, or an
571    /// operator may have cleared the lock manually); every other
572    /// delete failure is propagated.
573    pub(crate) async fn release(mut self) -> Result<(), ObjectStoreError> {
574        // ORDER IS LOAD-BEARING: stop the heartbeat AND wait for its
575        // join handle BEFORE deleting the lock key. The previous
576        // implementation called `handle.abort()` and then DELETE
577        // synchronously, but `abort()` only cancels the future at its
578        // next await point — a `put_bytes` already in flight on the
579        // server completes regardless of cancellation and can settle
580        // AFTER our DELETE, resurrecting the lock key as an orphan
581        // (issue #150). Cooperative shutdown via `watch` + join-await
582        // makes the worst case "one in-flight PUT completes before
583        // DELETE" instead of "one in-flight PUT completes after
584        // DELETE".
585        self.stop_heartbeat().await;
586        delete_idempotent(self.store.as_ref(), &self.lock_key).await
587    }
588
589    /// Signal cooperative shutdown to the heartbeat task and await
590    /// its exit (bounded by [`HEARTBEAT_JOIN_TIMEOUT`]). On timeout
591    /// the handle is `abort()`-ed, restoring the pre-#150 behaviour
592    /// for the pathological case of a `put_bytes` hung past the
593    /// timeout.
594    async fn stop_heartbeat(&mut self) {
595        // `send` only fails when there are no receivers — i.e. the
596        // heartbeat task has already exited. Either way, there's no
597        // PUT we can still prevent, so we ignore the result.
598        let _ = self.shutdown.send(true);
599        let Some(mut handle) = self.heartbeat.take() else {
600            return;
601        };
602        // Borrow the handle into `timeout` so we can still call
603        // `abort()` on it after the wait fails.
604        if tokio::time::timeout(HEARTBEAT_JOIN_TIMEOUT, &mut handle)
605            .await
606            .is_err()
607        {
608            warn!(
609                key = %self.lock_key,
610                timeout_secs = HEARTBEAT_JOIN_TIMEOUT.as_secs(),
611                "lock heartbeat did not exit within join timeout; \
612                 falling back to abort (in-flight PUT may still race \
613                 the upcoming DELETE)",
614            );
615            handle.abort();
616        }
617    }
618}
619
620impl Drop for LockGuard {
621    fn drop(&mut self) {
622        // `Drop` cannot `.await`, so we cannot mirror the release
623        // path's "wait for in-flight PUT to settle" guarantee here.
624        // The best we can do is (a) signal shutdown so the heartbeat
625        // task exits at its next await point, and (b) abort the
626        // JoinHandle so a future PUT issued while we were dropping
627        // doesn't keep racing forever. The lock key may briefly
628        // outlive the holder; the stale-lock recovery path
629        // (`acquire_lock`) reclaims it after TTL.
630        let _ = self.shutdown.send(true);
631        if let Some(handle) = self.heartbeat.take() {
632            handle.abort();
633        }
634    }
635}
636
637/// Heartbeat interval: `ttl/3`, floored at one second so a pathological
638/// sub-second TTL doesn't busy-loop. We use `ttl/3` rather than `ttl/2`
639/// so a single missed heartbeat (transient network blip with
640/// `MissedTickBehavior::Delay`) still leaves margin before `age > ttl`:
641/// two consecutive misses are needed to push the lock past the
642/// staleness threshold (#118 follow-up).
643pub(crate) fn heartbeat_interval(ttl: Duration) -> std::time::Duration {
644    let secs = ttl.whole_seconds().max(3) / 3;
645    // `try_from` rather than `as`: we just clamped to ≥ 1, but
646    // `clippy::cast_sign_loss` insists on the explicit fallible cast.
647    let secs_u64 =
648        u64::try_from(secs).expect("ttl.whole_seconds().max(3) / 3 is always >= 1 (non-negative)");
649    std::time::Duration::from_secs(secs_u64)
650}
651
652/// Try to acquire the per-ref lock. Returns `Ok(Some(guard))` when the
653/// lock was taken (heartbeat is already running) and `Ok(None)` on
654/// contention (caller should surface a "lock held" error). On a stale
655/// lock (older than `ttl`, with no heartbeat from a live holder), the
656/// lock is deleted and the conditional `put_if_absent` is retried
657/// once.
658///
659/// The race window between `head` and the retry `put_if_absent` is
660/// inherent to non-conditional deletes — another client could acquire
661/// the lock between our delete and retry. We accept that race; the
662/// retry `put_if_absent` will return `Ok(false)` and the user will
663/// retry.
664///
665/// Heartbeat semantics (issue #118): a live holder refreshes the lock
666/// every `ttl/3` so the staleness check correctly excludes locks held
667/// by an in-flight critical section. A long-running [`compact`] no
668/// longer races a concurrent writer that wakes up after the original
669/// TTL elapses.
670///
671/// [`compact`]: crate::packchain::compact::compact
672pub(crate) async fn acquire_lock(
673    store: Arc<dyn ObjectStore>,
674    lock_key: &str,
675    ttl: Duration,
676    now: OffsetDateTime,
677) -> Result<Option<LockGuard>, ObjectStoreError> {
678    if store.put_if_absent(lock_key, Bytes::new()).await? {
679        return Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)));
680    }
681    let meta = match store.head(lock_key).await {
682        Ok(m) => m,
683        // Lock vanished between put_if_absent and head — another client
684        // released it. Treat as contention; user retries.
685        Err(ObjectStoreError::NotFound(_)) => return Ok(None),
686        Err(e) => return Err(e),
687    };
688    let age = now - meta.last_modified;
689    if age <= ttl {
690        return Ok(None);
691    }
692    debug!(key = %lock_key, age_secs = age.whole_seconds(), "deleting stale lock");
693    delete_idempotent(store.as_ref(), lock_key).await?;
694    if store.put_if_absent(lock_key, Bytes::new()).await? {
695        Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)))
696    } else {
697        Ok(None)
698    }
699}
700
701/// Build a [`LockGuard`] and spawn its heartbeat task. The heartbeat
702/// re-PUTs the lock key every `heartbeat_interval(ttl)` (overwrite, not
703/// conditional) so the stale-lock recovery branch in [`acquire_lock`]
704/// correctly excludes still-held locks. Heartbeat failures are logged
705/// at `warn` and retried on the next tick — a transient blip should
706/// not surface as a critical-section-aborting error.
707///
708/// The task watches a [`watch::Receiver<bool>`]: `release` flips it to
709/// `true` and the loop exits at the next `select!` await point.
710/// Critically, the loop also re-checks the flag synchronously after
711/// waking from the tick and BEFORE issuing `put_bytes`, so once
712/// `release` has signalled shutdown no further PUT can be dispatched
713/// — closing the issue #150 race where an `abort()`-cancelled task
714/// had an in-flight `put_bytes` that settled on the server after
715/// `release`'s DELETE.
716fn spawn_lock_guard(store: Arc<dyn ObjectStore>, lock_key: String, ttl: Duration) -> LockGuard {
717    let interval = heartbeat_interval(ttl);
718    let task_store = Arc::clone(&store);
719    let task_key = lock_key.clone();
720    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
721    let handle = tokio::spawn(async move {
722        let mut tick = tokio::time::interval(interval);
723        // Skip the immediate first tick — the acquire just wrote the
724        // key, so a refresh in the same millisecond would be wasted
725        // bandwidth.
726        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
727        tick.tick().await; // immediate
728        loop {
729            tokio::select! {
730                // `biased` so shutdown is checked first on every
731                // wake: a tick and a shutdown that fire on the same
732                // poll must resolve to shutdown, otherwise the tick
733                // arm could win and issue one final PUT after release
734                // has begun.
735                biased;
736                _ = shutdown_rx.changed() => break,
737                _ = tick.tick() => {}
738            }
739            // Re-check the shutdown flag synchronously between the
740            // tick and the `put_bytes` await. Without this, a
741            // `release` that signals shutdown AFTER `tick.tick()`
742            // resolved but BEFORE we issued `put_bytes` would still
743            // let the PUT through and race the DELETE — exactly the
744            // issue #150 failure mode the `select!` alone does not
745            // close.
746            if *shutdown_rx.borrow() {
747                break;
748            }
749            match task_store
750                .put_bytes(&task_key, Bytes::new(), PutOpts::default())
751                .await
752            {
753                Ok(()) => debug!(key = %task_key, "lock heartbeat refreshed"),
754                Err(e) => warn!(
755                    key = %task_key,
756                    error = %e,
757                    "lock heartbeat refresh failed; will retry",
758                ),
759            }
760        }
761    });
762    LockGuard {
763        lock_key,
764        store,
765        shutdown: shutdown_tx,
766        heartbeat: Some(handle),
767    }
768}
769
770/// Release a previously acquired per-ref lock. `NotFound` is mapped to
771/// `Ok(())` (another client or the TTL may have already cleaned it up);
772/// every other delete failure is propagated so the caller can surface it.
773///
774/// Heartbeat semantics: the guard's heartbeat task is aborted before
775/// the delete so a heartbeat refresh in flight cannot re-create the
776/// key after we've removed it.
777pub(crate) async fn release_lock(guard: LockGuard) -> Result<(), ObjectStoreError> {
778    guard.release().await
779}
780
781/// Idempotent delete: treats `NotFound` as success (another client may
782/// have raced ahead) but propagates every other error.
783pub(crate) async fn delete_idempotent(
784    store: &dyn ObjectStore,
785    key: &str,
786) -> Result<(), ObjectStoreError> {
787    match store.delete(key).await {
788        Ok(()) | Err(ObjectStoreError::NotFound(_)) => Ok(()),
789        Err(e) => Err(e),
790    }
791}
792
793/// Best-effort delete of the prior bundle once the new bundle is durable.
794///
795/// Issue #121: `perform_push_under_lock` writes the new bundle first
796/// (the durable commit) and then deletes the previous bundle. A
797/// non-`NotFound` error on the delete must NOT fail the push — the new
798/// bundle is already on the bucket, so reporting failure to the user
799/// is a lie about the remote state. The two-bundle state that results
800/// from the orphan trips the under-lock "multiple bundles" guard on
801/// the next push, which directs the operator to `doctor`; the warn
802/// log gives them the orphan key directly.
803///
804/// Mirrors `force_push_baseline_cleanup` in `packchain::push`
805/// (issue #113).
806///
807/// Issue #157: this is now the fallback path used only when the
808/// preferred deferred-via-tombstone path
809/// ([`defer_prior_bundle_via_tombstone`]) cannot be taken — namely a
810/// prior key whose stem does not parse as a [`Sha40`]. The on-bucket
811/// layout filter ([`is_bundle_candidate`]) already rejects malformed
812/// stems before they reach the push path, so this branch is reachable
813/// only via a future schema loosening or manual bucket tampering.
814async fn delete_prior_bundle_best_effort(
815    store: &dyn ObjectStore,
816    remote_ref: &RefName,
817    prior_key: &str,
818) {
819    if let Err(e) = delete_idempotent(store, prior_key).await {
820        warn!(
821            ref_path = %remote_ref.as_str(),
822            key = %prior_key,
823            error = %e,
824            "prior-bundle cleanup failed (new bundle already committed); \
825             orphan key left for manual cleanup",
826        );
827    }
828}
829
830/// Defer the prior-bundle delete to `gc sweep` by writing a baseline
831/// tombstone naming the old SHA.
832///
833/// Issue #157: synchronously deleting the prior bundle from the push
834/// path created a race against in-flight fetches. A client that ran
835/// `list` and saw `refs/heads/main -> old_sha` then issued
836/// `fetch old_sha refs/heads/main` against the EXACT
837/// `<prefix>/<ref>/<old_sha>.bundle` path — [`fetch_one`] does not
838/// re-list or consult HEAD. If a concurrent force-push synchronously
839/// deleted that key between the `list` advertisement and the fetcher's
840/// GET, fetch failed with `NotFound`.
841///
842/// The fix mirrors the packchain force-push tombstone path (issue
843/// #134, `force_push_baseline_cleanup` in `packchain::push`): write a
844/// `<prefix>/gc/baseline-tomb-*.json` record naming the prior SHA, and
845/// let `gc sweep` reclaim the bundle after the configured grace
846/// window. Until grace expires the prior bundle remains readable, so
847/// any fetcher that advertised it from a stale list can still
848/// complete.
849///
850/// The tombstone format and key namespace are shared with the
851/// packchain engine because `gc sweep` is engine-agnostic at the
852/// reclamation layer: [`sweep_one_baseline_tombstone`] derives the
853/// bundle key from `(ref_name, sha)` and runs the same live-state
854/// recheck regardless of which engine wrote the tombstone.
855///
856/// **Fallback semantics on tombstone write failure**: `gc sweep` only
857/// reclaims keys it has a tombstone for, so a tombstone PUT failure
858/// would otherwise orphan the prior bundle indefinitely. To preserve
859/// the issue #121 "two-bundle state surfaces via the doctor message"
860/// recovery path, this helper falls back to the synchronous
861/// best-effort delete from [`delete_prior_bundle_best_effort`] on
862/// either:
863///
864/// - a prior key whose stem does not parse as a [`Sha40`] (no way to
865///   name it in the tombstone body), or
866/// - a [`write_baseline_tombstone_best_effort`] PUT failure (the
867///   helper already logged a warn naming the orphan).
868async fn defer_prior_bundle_via_tombstone(
869    store: &dyn ObjectStore,
870    prefix: Option<&str>,
871    remote_ref: &RefName,
872    prior_key: &str,
873    local_sha: Sha,
874) {
875    let Some(prior_stem) = parse_remote_sha_stem_from_key(prior_key) else {
876        warn!(
877            ref_path = %remote_ref.as_str(),
878            key = %prior_key,
879            "prior bundle key does not parse as a valid SHA; falling back to synchronous delete",
880        );
881        delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
882        return;
883    };
884    // `parse_remote_sha_stem_from_key` already validated the stem
885    // against `is_valid_bundle_stem` (40 lowercase hex), so
886    // `Sha40::try_new(prior_stem)` is infallible in practice; surface
887    // a failure as the synchronous-delete fallback rather than
888    // panicking — a future tightening of `Sha40` validation must not
889    // crash the push. `local_sha` still routes through `to_string`
890    // because `Sha` (gix `ObjectId`) only exposes a binary form.
891    let (Ok(prior_sha40), Ok(local_sha40)) = (
892        Sha40::try_new(prior_stem),
893        Sha40::try_new(local_sha.to_string()),
894    ) else {
895        warn!(
896            ref_path = %remote_ref.as_str(),
897            key = %prior_key,
898            "prior or local sha failed Sha40 validation; falling back to synchronous delete",
899        );
900        delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
901        return;
902    };
903    let wrote = write_baseline_tombstone_best_effort(
904        store,
905        prefix,
906        remote_ref,
907        &prior_sha40,
908        &local_sha40,
909        "bundle-engine-force-push",
910    )
911    .await;
912    if wrote {
913        debug!(
914            ref_path = %remote_ref.as_str(),
915            key = %prior_key,
916            "prior bundle deferred to gc sweep via baseline tombstone",
917        );
918    } else {
919        // `write_baseline_tombstone_best_effort` already logged a warn
920        // naming the orphan key. Fall back to the synchronous delete
921        // so the next push's multi-bundle guard does not flag this
922        // case to the operator unnecessarily.
923        delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
924    }
925}
926
927/// Best-effort upload of the optional zip artifact once the new bundle
928/// is durable.
929///
930/// Issue #127: `perform_push_under_lock` writes the new bundle, `HEAD`,
931/// and `FORMAT` (the git-protocol contract for a successful push) and
932/// then uploads an optional `repo.zip` CodePipeline-side convenience
933/// artifact. A non-`NotFound` error on the zip upload must NOT fail
934/// the push — the bundle is already durable, so reporting failure is
935/// a lie about the remote state. The next push re-puts the zip at
936/// the same key (idempotent), so an operator who notices the warn log
937/// or wants the artifact re-uploaded simply re-pushes.
938///
939/// Mirrors [`delete_prior_bundle_best_effort`] (issue #121) and
940/// `force_push_baseline_cleanup` in `packchain::push` (issue #113):
941/// log at warn with `ref_path`, `key`, and `error`, and return
942/// without propagating.
943async fn upload_zip_artifact_best_effort(
944    store: &dyn ObjectStore,
945    remote_ref: &RefName,
946    zip_dest: &str,
947    archive_path: &Path,
948    opts: PutOpts,
949) {
950    if let Err(e) = store.put_path(zip_dest, archive_path, opts).await {
951        warn!(
952            ref_path = %remote_ref.as_str(),
953            key = %zip_dest,
954            error = %e,
955            "zip artifact upload failed (bundle already committed); \
956             retry the push to re-upload the zip at the same key",
957        );
958    }
959}
960
961/// Drive a batch of `push` commands sequentially.
962///
963/// Each command is parsed, executed under its own per-ref lock, and
964/// produces one [`PushOutcome`]. Catastrophic errors (transport,
965/// malformed protocol input) abort the batch and bubble out as
966/// [`PushError`]; per-ref failures are encoded as
967/// [`PushOutcome::Error`] and the batch continues.
968pub(crate) async fn push_batch(
969    ctx: &super::BatchCtx,
970    kind: BackendKind,
971    zip: bool,
972    engine: StorageEngine,
973    cmds: Vec<String>,
974) -> Result<Vec<PushOutcome>, PushError> {
975    if cmds.is_empty() {
976        return Ok(Vec::new());
977    }
978    debug!(count = cmds.len(), "processing push batch");
979
980    let config = PushConfig {
981        zip,
982        engine,
983        ttl: lock_ttl_from_env(),
984        kind,
985    };
986    let mut outcomes = Vec::with_capacity(cmds.len());
987
988    for cmd in cmds {
989        // `parse_push_args` failures are catastrophic: a malformed `push`
990        // line means we cannot trust subsequent commands. Abort the batch.
991        let spec = parse_push_args(&cmd)?;
992        // Capture the ref name before `push_one` consumes the spec so we
993        // can still render an `error <ref> ...` line if the call fails.
994        let remote_ref_str = spec.remote_ref.as_str().to_owned();
995        let outcome = match push_one(
996            Arc::clone(&ctx.store),
997            ctx.prefix.as_deref(),
998            ctx.repo_dir.as_path(),
999            &config,
1000            OffsetDateTime::now_utc(),
1001            spec,
1002        )
1003        .await
1004        {
1005            Ok(o) => o,
1006            // Per-push operational failures (transport, local git, local I/O,
1007            // malformed remote bundle SHA) become `error <ref>` lines so the
1008            // batch can continue. Without this, a single 5xx blip in the
1009            // middle of a multi-ref push would silently drop the outcome
1010            // lines for already-completed pushes and leave git's local
1011            // ref-tracking inconsistent with the remote.
1012            Err(e)
1013                if matches!(
1014                    e,
1015                    PushError::Store(_) | PushError::Git(_) | PushError::Io(_) | PushError::Sha(_)
1016                ) =>
1017            {
1018                let chain = full_error_chain(&e);
1019                warn!(ref = %remote_ref_str, error = %chain, "push ref failed");
1020                PushOutcome::Error {
1021                    remote_ref: remote_ref_str,
1022                    message: format!(r#""{chain}"?"#),
1023                }
1024            }
1025            Err(e) => return Err(e),
1026        };
1027        outcomes.push(outcome);
1028    }
1029    Ok(outcomes)
1030}
1031
1032/// Render a [`PushError`] as a colon-separated chain so the
1033/// `error <ref>` wire line and the stderr log carry every level of
1034/// causal context.
1035///
1036/// `PushError`'s `thiserror` formats inline the immediate source via
1037/// `{0}` / `{source}`, and some of those sources (notably
1038/// [`ObjectStoreError::Network`], whose own Display embeds its boxed
1039/// inner error) themselves inline a further level — so a naive
1040/// chain-walk would produce `"object-store error during push: network
1041/// error: dns failure: dns failure"`. [`super::append_source_chain`]
1042/// dedups any level whose text is already at the tail of `msg`, so
1043/// the rendered chain is uniform across `Store`, `Git`, `Io`, and `Sha`
1044/// variants without per-variant special-casing.
1045fn full_error_chain(err: &PushError) -> String {
1046    let mut msg = err.to_string();
1047    super::append_source_chain(&mut msg, err);
1048    msg
1049}
1050
1051/// Recoverable per-push errors discovered while talking to the local
1052/// repo. Mapped by the caller into [`PushOutcome::Error`] strings.
1053enum GitProbeError {
1054    /// `local_spec` did not resolve in the local repo.
1055    LocalRefNotFound,
1056    /// Pre-existing remote bundle is not an ancestor of `local_sha`.
1057    NotAncestor,
1058}
1059
1060/// Local git work that must run synchronously because `gix::Repository`
1061/// is `!Sync` and cannot cross `.await` points without making the
1062/// surrounding future `!Send`.
1063struct LocalGit {
1064    /// Resolved commit OID for the user's `local_spec`.
1065    local_sha: Sha,
1066    /// Working directory passed to `bundle::create`.
1067    cwd: PathBuf,
1068    /// On the zip path: archive on disk + metadata for the upload.
1069    /// `None` on the regular push path. The `TempDir` keeps the file
1070    /// alive until the async caller reads its bytes.
1071    zip_artifacts: Option<ZipArtifacts>,
1072    /// Whether the pre-lock-listed remote bundle's SHA was an ancestor
1073    /// of `local_sha`. `true` when there was no pre-existing bundle.
1074    /// Stashed so [`perform_push_under_lock`] can decide, under the
1075    /// per-ref lock, whether a force-push against a now-`PROTECTED#`
1076    /// ref is a safe fast-forward (issue #129).
1077    pre_existing_was_ancestor: bool,
1078}
1079
1080struct ZipArtifacts {
1081    archive_path: PathBuf,
1082    short_sha: String,
1083    commit_msg: String,
1084    /// Owned tempdir that backs `archive_path`; dropped after upload.
1085    _tempdir: tempfile::TempDir,
1086}
1087
1088/// All state computed before the per-ref lock is acquired.
1089///
1090/// Passed to [`perform_push_under_lock`] so the argument count stays within
1091/// the clippy budget and the two phases of [`push_one`] have a clean
1092/// boundary: pre-lock work (protect check, bundle listing, local git,
1093/// bundle creation) vs. under-lock work (re-list, upload, HEAD/FORMAT
1094/// init, old-bundle deletion).
1095struct PushReadyState {
1096    remote_ref: RefName,
1097    local_sha: Sha,
1098    /// Key of the pre-existing bundle (if any). Checked inside the lock
1099    /// to detect concurrent pushes (stale-remote guard).
1100    pre_existing: Option<String>,
1101    bundle_path: PathBuf,
1102    zip_artifacts: Option<ZipArtifacts>,
1103    engine: StorageEngine,
1104    /// User's `--force` intent. Carried into the lock window so the
1105    /// `PROTECTED#` check can run *under* the lock and close the TOCTOU
1106    /// window between the pre-lock check and the lock acquisition
1107    /// (issue #129).
1108    force: bool,
1109    /// Whether the pre-lock-listed bundle's SHA was an ancestor of
1110    /// `local_sha` (always `true` when there was no pre-existing
1111    /// bundle). Used together with `force` and the under-lock
1112    /// protection re-check to decide whether to reject a force-push
1113    /// that became protected mid-flight.
1114    pre_existing_was_ancestor: bool,
1115    /// Original local refspec, kept so the under-lock `NotAncestor` error
1116    /// message renders the same text the pre-lock arm would have used.
1117    local_spec: String,
1118    /// Issue #165: tombstone set captured during [`prepare_push`] so the
1119    /// under-lock [`bundles_for_ref`] call inside
1120    /// [`perform_push_under_lock`] does not re-list `<prefix>/gc/` and
1121    /// re-fetch every baseline tombstone. Sound because all tombstone
1122    /// writers for this ref serialize through the same per-ref lock —
1123    /// no new tombstone for this ref can land in the pre-lock /
1124    /// under-lock window.
1125    hidden_bundles: HashSet<String>,
1126    /// Keeps the temp directory (and `bundle_path`) alive until the bundle
1127    /// is uploaded inside `perform_push_under_lock`.
1128    _temp_dir: tempfile::TempDir,
1129}
1130
1131/// Outcome of [`prepare_push`]: one of three paths into [`push_one`]'s
1132/// lock window.
1133///
1134/// - [`PrepareOutcome::Ready`] — pre-lock work completed; caller must
1135///   acquire the per-ref lock and call [`perform_push_under_lock`].
1136/// - [`PrepareOutcome::Delete`] — delete refspec (`:<ref>`); caller must
1137///   acquire the per-ref lock and call [`delete_remote_ref_under_lock`].
1138///   Issue #133: the listing and sweep MUST run under the lock so a
1139///   concurrent push that lands a new bundle cannot turn the delete
1140///   into a silent false success.
1141/// - [`PrepareOutcome::Done`] — outcome already decided pre-lock
1142///   (multi-bundle corruption, ancestry failure, …); caller returns it
1143///   directly without taking the lock.
1144enum PrepareOutcome {
1145    // `PushReadyState` is the largest variant (paths, temp dir guard,
1146    // hidden-bundle hash set added in #165); boxing keeps the enum's
1147    // discriminant compact regardless of variant.
1148    Ready(Box<PushReadyState>),
1149    Delete { remote_ref: RefName, zip: bool },
1150    Done(PushOutcome),
1151}
1152
1153/// Open the repo, resolve `local_sha`, compute ancestry of the
1154/// pre-existing remote bundle, and (for the zip variant) build the
1155/// archive synchronously. The `Repository` handle is dropped before
1156/// this returns so the caller's `Future` can stay `Send`. Archive
1157/// bytes are NOT read here — the async caller does that with
1158/// `tokio::fs::read` to avoid blocking the runtime on file I/O.
1159///
1160/// Ancestry is always computed and returned via
1161/// [`LocalGit::pre_existing_was_ancestor`]; this function only emits
1162/// [`GitProbeError::NotAncestor`] for the non-force case. Force pushes
1163/// defer the ancestry-vs-protection decision to
1164/// [`perform_push_under_lock`] so a concurrent `protect` cannot race
1165/// the pre-lock check (issue #129).
1166fn local_git_work(
1167    repo_dir: &Path,
1168    local_spec: &str,
1169    pre_existing_sha: Option<Sha>,
1170    force_push: bool,
1171    zip: bool,
1172) -> Result<Result<LocalGit, GitProbeError>, GitError> {
1173    let repo = gix::open(repo_dir)?;
1174    let cwd = repo.workdir().unwrap_or_else(|| repo.git_dir()).to_owned();
1175
1176    let Ok(local_sha) = git::branch::resolve(&repo, local_spec) else {
1177        return Ok(Err(GitProbeError::LocalRefNotFound));
1178    };
1179
1180    let pre_existing_was_ancestor = match pre_existing_sha {
1181        None => true,
1182        Some(remote_sha) => git::is_ancestor(&repo, remote_sha, local_sha)?,
1183    };
1184    if !force_push && !pre_existing_was_ancestor {
1185        return Ok(Err(GitProbeError::NotAncestor));
1186    }
1187
1188    let zip_artifacts = if zip {
1189        let tempdir = tempfile::Builder::new()
1190            .prefix("git_remote_object_store_archive_")
1191            .tempdir()?;
1192        let archive_path = git::archive(&repo, tempdir.path(), local_spec)?;
1193        let commit_msg = git::last_commit_message(&repo).unwrap_or_default();
1194        let sha_hex = local_sha.to_string();
1195        let short_sha = sha_hex[..8].to_owned();
1196        Some(ZipArtifacts {
1197            archive_path,
1198            short_sha,
1199            commit_msg,
1200            _tempdir: tempdir,
1201        })
1202    } else {
1203        None
1204    };
1205
1206    drop(repo);
1207    Ok(Ok(LocalGit {
1208        local_sha,
1209        cwd,
1210        zip_artifacts,
1211        pre_existing_was_ancestor,
1212    }))
1213}
1214
1215/// Perform all pre-lock work for a push: resolve the local ref, check
1216/// ancestry, list the remote bundles, and create the local bundle file.
1217/// Returns [`PrepareOutcome::Done`] for cases that are already resolved
1218/// (delete-refspec, protection check, multiple-bundle error, ancestry
1219/// failure) or [`PrepareOutcome::Ready`] when the caller should proceed
1220/// to acquire the per-ref lock and call [`perform_push_under_lock`].
1221async fn prepare_push(
1222    store: &dyn ObjectStore,
1223    prefix: Option<&str>,
1224    repo_dir: &Path,
1225    config: &PushConfig,
1226    spec: PushSpec,
1227) -> Result<PrepareOutcome, PushError> {
1228    let PushSpec {
1229        force,
1230        local_spec,
1231        remote_ref,
1232    } = spec;
1233    let remote_ref_str = remote_ref.as_str().to_owned();
1234
1235    if local_spec.is_empty() {
1236        // Issue #133: do NOT list / sweep here. The bundle engine's
1237        // delete must run INSIDE the per-ref lock so a concurrent push
1238        // landing a new bundle between our listing and our deletion
1239        // cannot produce a silent false success. Defer to
1240        // [`delete_remote_ref_under_lock`] in [`push_one`].
1241        return Ok(PrepareOutcome::Delete {
1242            remote_ref,
1243            zip: config.zip,
1244        });
1245    }
1246
1247    // Issue #129: do NOT call `is_protected` here. A pre-lock check
1248    // races against a concurrent `protect`: the check could pass and a
1249    // `PROTECTED#` marker could land before we acquire the per-ref
1250    // lock, letting a force-push overwrite a now-protected ref. The
1251    // check is performed *under* the lock in
1252    // [`perform_push_under_lock`] instead. Pre-lock we just respect the
1253    // user's `--force` intent; the `local_git_work` probe still returns
1254    // ancestry info via [`LocalGit::pre_existing_was_ancestor`] so the
1255    // under-lock arm can render the same NotAncestor message a
1256    // protection-demoted non-force push would have produced.
1257    let force_push = force;
1258    debug!(local = %local_spec, remote = %remote_ref, force_push, "push");
1259
1260    // Issue #165: compute the tombstone set once here and reuse it under
1261    // the per-ref lock in [`perform_push_under_lock`]. The lock ensures
1262    // no concurrent writer can publish a tombstone for this ref between
1263    // the two `bundles_for_ref` calls. The cached set is unused on the
1264    // multi-bundle / delete / parse-error early-return paths — a wasted
1265    // listing — but each path already terminates the push, so the
1266    // ~one-call overhead is bounded.
1267    let hidden_bundles = tombstoned_bundle_keys(store, prefix).await?;
1268    let pre_bundles = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
1269    if pre_bundles.len() > 1 {
1270        return Ok(PrepareOutcome::Done(PushOutcome::Error {
1271            remote_ref: remote_ref_str,
1272            message:
1273                r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
1274                    .to_owned(),
1275        }));
1276    }
1277    let pre_existing = pre_bundles.into_iter().next().map(|m| m.key);
1278
1279    let pre_existing_sha = match pre_existing.as_deref() {
1280        None => None,
1281        Some(key) => {
1282            let Some(s) = parse_remote_sha_from_key(key) else {
1283                return Ok(PrepareOutcome::Done(PushOutcome::Error {
1284                    remote_ref: remote_ref_str,
1285                    message: format!(
1286                        r#""unable to parse remote bundle key {key:?}; run git-remote-object-store doctor to fix."?"#,
1287                    ),
1288                }));
1289            };
1290            Some(s)
1291        }
1292    };
1293
1294    // Sync gix work (rev-parse / ancestor / archive) runs in a
1295    // dedicated scope so the !Sync `Repository` is dropped before any
1296    // .await — keeps `push_batch`'s future `Send`.
1297    let probe = local_git_work(
1298        repo_dir,
1299        &local_spec,
1300        pre_existing_sha,
1301        force_push,
1302        config.zip,
1303    )?;
1304    let local = match probe {
1305        Ok(local) => local,
1306        Err(GitProbeError::LocalRefNotFound) => {
1307            return Ok(PrepareOutcome::Done(PushOutcome::Error {
1308                remote_ref: remote_ref_str,
1309                message: format!(r#""{local_spec} not found"?"#),
1310            }));
1311        }
1312        Err(GitProbeError::NotAncestor) => {
1313            return Ok(PrepareOutcome::Done(PushOutcome::Error {
1314                remote_ref: remote_ref_str,
1315                message: not_ancestor_wire_message(&local_spec),
1316            }));
1317        }
1318    };
1319
1320    let temp_dir = tempfile::Builder::new()
1321        .prefix("git_remote_object_store_push_")
1322        .tempdir()?;
1323    let bundle_path =
1324        git::bundle_at(&local.cwd, temp_dir.path(), local.local_sha, &local_spec).await?;
1325
1326    Ok(PrepareOutcome::Ready(Box::new(PushReadyState {
1327        remote_ref,
1328        local_sha: local.local_sha,
1329        pre_existing,
1330        bundle_path,
1331        zip_artifacts: local.zip_artifacts,
1332        engine: config.engine,
1333        force,
1334        pre_existing_was_ancestor: local.pre_existing_was_ancestor,
1335        local_spec,
1336        hidden_bundles,
1337        _temp_dir: temp_dir,
1338    })))
1339}
1340
1341/// Execute one push or delete: prepare, lock, do work, release.
1342///
1343/// Both the upload path ([`perform_push_under_lock`]) and the delete
1344/// path ([`delete_remote_ref_under_lock`]) run inside the SAME per-ref
1345/// lock window with identical acquire/release accounting:
1346///
1347/// - `acquire_lock` returning `None` → emit the standard
1348///   "failed to acquire ref lock" wire error without doing work.
1349/// - `release_lock` failing AFTER successful work → downgrade the
1350///   outcome to an `Error` so the operator is alerted to a potentially
1351///   dangling lock.
1352/// - A genuine work error takes priority over a release failure — the
1353///   release error is logged but never masks the original failure.
1354///
1355/// Issue #133: putting the delete path under this same window closes
1356/// the race where a concurrent push could land a new bundle between a
1357/// pre-lock listing and a pre-lock sweep, producing a silent false
1358/// success.
1359async fn push_one(
1360    store: Arc<dyn ObjectStore>,
1361    prefix: Option<&str>,
1362    repo_dir: &Path,
1363    config: &PushConfig,
1364    now: OffsetDateTime,
1365    spec: PushSpec,
1366) -> Result<PushOutcome, PushError> {
1367    let (remote_ref_str, work): (String, UnderLockWork) =
1368        match prepare_push(store.as_ref(), prefix, repo_dir, config, spec).await? {
1369            PrepareOutcome::Done(o) => return Ok(o),
1370            PrepareOutcome::Ready(state) => (
1371                state.remote_ref.as_str().to_owned(),
1372                // `state` is already `Box<PushReadyState>` (`PrepareOutcome::Ready`
1373                // is boxed to keep the enum's largest variant compact); reuse the
1374                // existing allocation instead of re-boxing.
1375                UnderLockWork::Push(state),
1376            ),
1377            PrepareOutcome::Delete { remote_ref, zip } => (
1378                remote_ref.as_str().to_owned(),
1379                UnderLockWork::Delete { remote_ref, zip },
1380            ),
1381        };
1382
1383    let lock = match &work {
1384        UnderLockWork::Push(state) => lock_key(prefix, &state.remote_ref),
1385        UnderLockWork::Delete { remote_ref, .. } => lock_key(prefix, remote_ref),
1386    };
1387
1388    let Some(guard) = acquire_lock(Arc::clone(&store), &lock, config.ttl, now).await? else {
1389        return Ok(PushOutcome::Error {
1390            remote_ref: remote_ref_str,
1391            message: format!(
1392                // `push_one` covers BOTH the Push and Delete arms under the
1393                // same per-ref lock, so the contention message names both
1394                // — mirroring the packchain delete path's wording. A
1395                // delete-arm caller previously saw a misleading
1396                // "may be pushing" hint here.
1397                r#""failed to acquire ref lock at {lock}. Another client may be pushing or deleting. If this persists beyond {}s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
1398                config.ttl.whole_seconds(),
1399            ),
1400        });
1401    };
1402
1403    let result = match work {
1404        UnderLockWork::Push(state) => {
1405            perform_push_under_lock(store.as_ref(), prefix, config.kind, *state).await
1406        }
1407        UnderLockWork::Delete { remote_ref, zip } => {
1408            delete_remote_ref_under_lock(store.as_ref(), prefix, &remote_ref, zip, &lock).await
1409        }
1410    };
1411    let release_result = release_lock(guard).await;
1412
1413    match (&result, release_result) {
1414        (Ok(PushOutcome::Ok { .. }), Err(e)) => {
1415            warn!(key = %lock, error = %e, "failed to release lock");
1416            Ok(PushOutcome::Error {
1417                remote_ref: remote_ref_str,
1418                message: format!(
1419                    r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
1420                ),
1421            })
1422        }
1423        (_, Err(e)) => {
1424            warn!(key = %lock, error = %e, "lock release failed (work already errored)");
1425            result
1426        }
1427        _ => result,
1428    }
1429}
1430
1431/// The work to perform inside the per-ref lock acquired by [`push_one`].
1432///
1433/// `PushReadyState` is significantly larger than the `Delete` variant
1434/// (paths, the temp dir guard, the captured refspec); boxing it keeps
1435/// the [`UnderLockWork`] discriminant compact regardless of variant.
1436enum UnderLockWork {
1437    Push(Box<PushReadyState>),
1438    Delete { remote_ref: RefName, zip: bool },
1439}
1440
1441/// Re-list under the lock, upload the bundle, init HEAD, write the `FORMAT`
1442/// key, defer the previous bundle's delete via a baseline tombstone (issue
1443/// #157), optionally upload `repo.zip`. Split out so the lock release in
1444/// the caller is unconditional.
1445async fn perform_push_under_lock(
1446    store: &dyn ObjectStore,
1447    prefix: Option<&str>,
1448    kind: BackendKind,
1449    state: PushReadyState,
1450) -> Result<PushOutcome, PushError> {
1451    let PushReadyState {
1452        remote_ref,
1453        local_sha,
1454        pre_existing,
1455        bundle_path,
1456        zip_artifacts,
1457        engine,
1458        force,
1459        pre_existing_was_ancestor,
1460        local_spec,
1461        hidden_bundles,
1462        _temp_dir,
1463    } = state;
1464    let remote_ref_str = remote_ref.as_str().to_owned();
1465
1466    // Issue #129: under-lock force-push protection check. A pre-lock
1467    // check would race against a concurrent `protect`; running here —
1468    // after `acquire_lock`, before any writes — closes that TOCTOU
1469    // window. The historical "protected ref + force" semantic is
1470    // "demote to non-force": if the local SHA would have been a
1471    // fast-forward against the pre-lock-listed remote (already pinned
1472    // by the stale-remote guard below), let the push through; if not,
1473    // emit the same NotAncestor wire error a pre-lock non-force probe
1474    // would have produced. The `is_protected` helper uses `head`, not
1475    // `list`, so this adds one cheap key probe and does not duplicate
1476    // any existing under-lock listing.
1477    if force && !pre_existing_was_ancestor && is_protected(store, prefix, &remote_ref).await? {
1478        return Ok(PushOutcome::Error {
1479            remote_ref: remote_ref_str,
1480            message: not_ancestor_wire_message(&local_spec),
1481        });
1482    }
1483
1484    // Issue #165: reuse the tombstone set captured in `prepare_push`.
1485    // The per-ref lock prevents any new tombstone for this ref from
1486    // landing between then and now.
1487    let current = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
1488    if current.len() > 1 {
1489        return Ok(PushOutcome::Error {
1490            remote_ref: remote_ref_str,
1491            message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
1492        });
1493    }
1494    let current_key = current.into_iter().next().map(|m| m.key);
1495    // Compare the pre-lock snapshot to the under-lock reality. All five
1496    // (pre / under) cases must agree:
1497    //   None / None           — happy path, no bundle existed before or now.
1498    //   Some(K) / Some(K)     — happy path, same bundle (e.g. force-push that
1499    //                           re-uploads against the same SHA).
1500    //   Some(A) / Some(B)     — concurrent push replaced our snapshot's
1501    //                           bundle. Reject: we'd silently overwrite it.
1502    //   None / Some           — concurrent push created a bundle after our
1503    //                           pre-lock list. Reject.
1504    //   Some / None           — concurrent delete (`git push :ref`) removed
1505    //                           our snapshot's bundle. Reject.
1506    // Without this guard a concurrent writer between the pre-lock list and
1507    // the lock acquisition would be silently overwritten.
1508    if pre_existing.as_deref() != current_key.as_deref() {
1509        return Ok(PushOutcome::Error {
1510            remote_ref: remote_ref_str,
1511            message: r#""stale remote. Please fetch and retry."?"#.to_owned(),
1512        });
1513    }
1514
1515    let bundle_dest = keys::bundle_key(prefix, &remote_ref, local_sha);
1516    // Stat once for "X / total" formatting in progress logs. The
1517    // multipart upload re-stats `bundle_path` to size the part plan;
1518    // a brief race here would surface as the log showing a stale
1519    // total — never as wrong bytes — so we tolerate it. On stat
1520    // failure (vanishingly rare on a tempdir we just wrote) fall
1521    // back to "unknown" rather than aborting the push.
1522    let bundle_total = tokio::fs::metadata(&bundle_path)
1523        .await
1524        .map(|m| m.len())
1525        .ok();
1526    let bundle_opts = PutOpts {
1527        progress: Some(bundle_progress_sink(&bundle_dest, bundle_total)),
1528        ..PutOpts::default()
1529    };
1530    store
1531        .put_path(&bundle_dest, &bundle_path, bundle_opts)
1532        .await?;
1533
1534    // HEAD bootstrap: write only if absent. Single round-trip via
1535    // put_if_absent — we don't care about the boolean (existing HEAD is
1536    // intentionally preserved).
1537    let head = head_key(prefix);
1538    store
1539        .put_if_absent(
1540            &head,
1541            Bytes::copy_from_slice(remote_ref.as_str().as_bytes()),
1542        )
1543        .await?;
1544
1545    // Lock in the storage engine on the first push. `put_if_absent` makes
1546    // concurrent first-push races safe — both write the same `bundle` value,
1547    // so the one that loses is a no-op. The boolean result is intentionally
1548    // ignored: an existing FORMAT key was already validated at connect time by
1549    // `backend::build`.
1550    let format_key = keys::join(prefix, "FORMAT");
1551    store
1552        .put_if_absent(&format_key, Bytes::from_static(engine.as_str().as_bytes()))
1553        .await?;
1554
1555    if let Some(prev) = current_key
1556        && prev != bundle_dest
1557    {
1558        // Issue #157: defer the prior-bundle delete to `gc sweep` via a
1559        // baseline tombstone instead of deleting it synchronously. A
1560        // concurrent fetcher that already advertised the prior SHA via
1561        // `list` would otherwise race the GET against this DELETE; the
1562        // tombstone gives `fetch_one` the grace window it needs.
1563        defer_prior_bundle_via_tombstone(store, prefix, &remote_ref, &prev, local_sha).await;
1564    }
1565
1566    if let Some(artifacts) = zip_artifacts {
1567        let zip_dest = archive_key(prefix, &remote_ref);
1568        let zip_total = tokio::fs::metadata(&artifacts.archive_path)
1569            .await
1570            .map(|m| m.len())
1571            .ok();
1572        // Issue #161: the `codepipeline-artifact-revision-summary` user
1573        // metadata is only meaningful on S3 (AWS CodePipeline consumes it
1574        // as `x-amz-meta-codepipeline-artifact-revision-summary`). Azure
1575        // metadata names must be valid C# identifiers (no hyphens), so
1576        // attaching the header on the Azure path causes the entire blob
1577        // upload to fail with `InvalidMetadata` — and the issue #127
1578        // best-effort swallow then hides the failure, leaving every
1579        // `?zip=1` push silently missing `repo.zip`. Omit the header
1580        // outside S3 so the zip artifact lands successfully.
1581        let user_metadata = match kind {
1582            BackendKind::S3 => vec![(
1583                "codepipeline-artifact-revision-summary".to_owned(),
1584                sanitize_metadata_value(&artifacts.commit_msg),
1585            )],
1586            BackendKind::Azure => Vec::new(),
1587        };
1588        let opts = PutOpts {
1589            content_disposition: Some(format!(
1590                "attachment; filename=repo-{}.zip",
1591                artifacts.short_sha
1592            )),
1593            user_metadata,
1594            progress: Some(bundle_progress_sink(&zip_dest, zip_total)),
1595        };
1596        upload_zip_artifact_best_effort(
1597            store,
1598            &remote_ref,
1599            &zip_dest,
1600            &artifacts.archive_path,
1601            opts,
1602        )
1603        .await;
1604    }
1605
1606    Ok(PushOutcome::Ok {
1607        remote_ref: remote_ref_str,
1608    })
1609}
1610
1611/// Build a [`ProgressSink`] that emits one structured `tracing::info!`
1612/// line per chunk transferred during a bundle / zip-archive upload.
1613///
1614/// Issue #55. Git's helper protocol has no upload-progress channel
1615/// (the helper-protocol stdout is reserved for protocol traffic per
1616/// `.claude/rules/protocol-stdout.md`), so the bundle path's only way
1617/// to inform the user during a multi-GiB upload is `tracing::info!`,
1618/// which routes to stderr via the tracing-subscriber initialised in
1619/// `main()`. The LFS path keeps its own sink wiring (one
1620/// progress-event JSON line per chunk on stdout, governed by the LFS
1621/// custom-transfer protocol).
1622///
1623/// `total` is the bundle size at the moment we stat'd it. A short
1624/// race against a writer that re-stats during multipart planning
1625/// would only mis-format the log line — it cannot drive wrong-byte
1626/// behaviour. `None` renders "unknown" so the call site can
1627/// gracefully degrade when stat'ing the source fails.
1628///
1629/// The granularity of events is whatever the backend's `put_path`
1630/// provides: one event per completed multipart part / staged block
1631/// for bodies above [`crate::object_store::multipart::MULTIPART_PUT_THRESHOLD`],
1632/// one event total for bodies below it. With the default 16 MiB
1633/// part size and 8-way concurrency, a 1 GiB bundle emits ~64 lines
1634/// — ample motion to spot a stall, far short of log spam.
1635pub(crate) fn bundle_progress_sink(key: &str, total: Option<u64>) -> ProgressSink {
1636    // The closure needs an owned `String` because `ProgressSink` is
1637    // `'static`; cloning here keeps the call sites' borrow intact so
1638    // they can pass `&bundle_dest` to `put_path` without juggling
1639    // ownership.
1640    let key = key.to_owned();
1641    let bytes_so_far = Arc::new(AtomicU64::new(0));
1642    ProgressSink::new(move |bytes_amount| {
1643        // `Ordering::Relaxed` is enough: we're not synchronising with
1644        // any other state, just maintaining a monotonic counter for
1645        // log lines. The worst a re-ordered store could do is print
1646        // an out-of-order count, which `tracing` already disclaims.
1647        let so_far = bytes_so_far
1648            .fetch_add(bytes_amount, Ordering::Relaxed)
1649            .saturating_add(bytes_amount);
1650        // Render `total` as a Display value so the field is omitted
1651        // when stat'ing the source failed at the call site. Tracing's
1652        // `Option<u64>` rendering would print "None" — uglier than a
1653        // bare absence of the field.
1654        if let Some(t) = total {
1655            info!(
1656                key = %key,
1657                bytes_so_far = so_far,
1658                total = t,
1659                bytes_chunk = bytes_amount,
1660                "uploading"
1661            );
1662        } else {
1663            info!(
1664                key = %key,
1665                bytes_so_far = so_far,
1666                bytes_chunk = bytes_amount,
1667                "uploading"
1668            );
1669        }
1670    })
1671}
1672
1673/// Replace ASCII control characters in `s` with spaces so the result
1674/// is safe to use as an HTTP header value.
1675///
1676/// `commit_msg` flows from `git::last_commit_message` (whose summary
1677/// is "everything before the first blank line" per gix) into the
1678/// `codepipeline-artifact-revision-summary` user-metadata header on
1679/// the zip-archive upload. A maliciously-crafted commit could embed
1680/// `\r\n` in the summary, which would be a CRLF injection on
1681/// transport — splitting one logical header into two and letting an
1682/// attacker forge arbitrary user-metadata headers on the uploaded
1683/// archive. Both backends' SDKs reject CRLF at the transport layer
1684/// today, but that defense is version-dependent and the resulting
1685/// error is a cryptic "invalid header" 400; sanitising here surfaces
1686/// a clean, predictable value at the call site instead.
1687fn sanitize_metadata_value(s: &str) -> String {
1688    s.chars()
1689        .map(|c| if c.is_control() { ' ' } else { c })
1690        .collect()
1691}
1692
1693/// Expected key count for a non-zip ref: one bundle object.
1694const DELETE_EXPECTED_NO_ZIP: usize = 1;
1695/// Expected key count for a zip ref: one bundle + one archive object.
1696const DELETE_EXPECTED_WITH_ZIP: usize = 2;
1697
1698/// Handle a delete refspec (`:<remote_ref>`) UNDER the per-ref lock
1699/// acquired by [`push_one`]: list `<prefix>/<ref>/`, expect 1 (or 2
1700/// with zip) keys after filtering out the lock we hold, delete them
1701/// all, emit `ok` or the appropriate error.
1702///
1703/// Issue #133: this must run inside the lock window. A pre-lock
1704/// listing-then-sweep races a concurrent push that lands a new bundle
1705/// between the listing and the deletion, producing a silent false
1706/// success — the delete reports `ok` to git while the ref survives
1707/// with a different bundle on the server.
1708///
1709/// The lock key (`<prefix>/<ref>/LOCK#.lock`) is filtered from the
1710/// listing — `release_lock` removes it last, after this function
1711/// returns. Apart from that one filter, the listing is unfiltered and
1712/// counts `PROTECTED#` and `repo.zip` against the expected total.
1713///
1714/// Issue #128: the `PROTECTED#` marker check is the FIRST guard, run
1715/// against the fresh under-lock listing BEFORE any count-vs-expected
1716/// dispatch. The pre-#128 ordering only consulted the marker in the
1717/// `else if` mismatch branch, so a count-matching listing (e.g.
1718/// `[bundle.bundle, PROTECTED#]` in zip mode, or `[PROTECTED#]` alone
1719/// in non-zip mode) would sweep the marker and report `ok`. With the
1720/// guard at the top, that bypass is closed.
1721///
1722/// Four behaviours fall out:
1723///
1724/// 1. **Protected ref** — listing (lock filtered out) includes a key
1725///    whose final segment is the [`keys::PROTECTED_MARKER_SEGMENT`].
1726///    Emit a protection-specific refusal naming the management CLI's
1727///    `unprotect` workflow.
1728/// 2. **Count matches `expected`, no marker** — sweep the entries and
1729///    report `ok`.
1730/// 3. **No bundle present** — listing (lock filtered out) is empty.
1731///    Returns the `"not found"?` wire error. This now includes the case
1732///    of a ref whose only on-server state was a stale `LOCK#.lock`:
1733///    `acquire_lock` recovers the stale lock, the post-lock listing
1734///    contains only our newly-held lock, and the filter renders it
1735///    empty.
1736/// 4. **Genuine multi-bundle / corruption** — count exceeds `expected`
1737///    and no marker is present. Fall through to the doctor message.
1738///    Example listing:
1739///    `[ "<prefix>/refs/heads/main/<sha-a>.bundle",
1740///       "<prefix>/refs/heads/main/<sha-b>.bundle" ]`.
1741async fn delete_remote_ref_under_lock(
1742    store: &dyn ObjectStore,
1743    prefix: Option<&str>,
1744    remote_ref: &RefName,
1745    zip: bool,
1746    lock_key: &str,
1747) -> Result<PushOutcome, PushError> {
1748    let listing = ref_listing_prefix(prefix, remote_ref);
1749    let all_entries = store.list(&listing).await?;
1750    // Issue #133: filter out the lock key we hold. `release_lock`
1751    // removes it last; the sweep below must not touch it (deleting our
1752    // own lock mid-critical-section would let concurrent clients
1753    // acquire it). The remaining count is what the protocol's
1754    // count-vs-expected dispatch operates on.
1755    let entries: Vec<&ObjectMeta> = all_entries.iter().filter(|e| e.key != lock_key).collect();
1756    let expected = if zip {
1757        DELETE_EXPECTED_WITH_ZIP
1758    } else {
1759        DELETE_EXPECTED_NO_ZIP
1760    };
1761    let remote_ref_str = remote_ref.as_str().to_owned();
1762    // Issue #128: the canonical protection guard. Run it FIRST against
1763    // the fresh, under-lock listing, BEFORE the count-match deletion
1764    // branch. The pre-#128 ordering only checked for the marker in the
1765    // `else if` mismatch branch, so a count-matching listing (e.g.
1766    // `[bundle.bundle, PROTECTED#]` with `expected = 2` in zip mode, or
1767    // `[PROTECTED#]` alone with `expected = 1`) would sweep the marker
1768    // and complete the delete silently. `entries_have_protected_marker`
1769    // matches only the literal `PROTECTED#` last segment — never the
1770    // `LOCK#.lock` lock key — so scanning the unfiltered `all_entries`
1771    // here is safe and avoids re-deriving a filtered view solely for
1772    // this check.
1773    if keys::entries_have_protected_marker(&all_entries) {
1774        return Ok(PushOutcome::Error {
1775            remote_ref: remote_ref_str,
1776            message: DELETE_PROTECTION_MESSAGE.to_owned(),
1777        });
1778    }
1779    if entries.len() == expected {
1780        for entry in &entries {
1781            delete_idempotent(store, &entry.key).await?;
1782        }
1783        // Issue #151 defence-in-depth: confirm no `PROTECTED#` marker
1784        // sneaked in. The lock window is still open (the caller releases
1785        // it after we return), so a `protect`/`unprotect` racing this
1786        // delete would be blocked on the lock per #159. Finding a marker
1787        // here would indicate a contract violation; the helper logs at
1788        // `error!` and the delete still reports `ok` — see the helper
1789        // doc for the rationale.
1790        verify_no_orphan_protected_after_delete(store, prefix, remote_ref).await;
1791        Ok(PushOutcome::Ok {
1792            remote_ref: remote_ref_str,
1793        })
1794    } else if entries.is_empty() {
1795        Ok(PushOutcome::Error {
1796            remote_ref: remote_ref_str,
1797            message: r#""not found"?"#.to_owned(),
1798        })
1799    } else {
1800        // Genuine multi-bundle / corruption: `entries` has more than
1801        // `expected` items and no `PROTECTED#` marker (that case
1802        // returned at the top guard above). Fall through to the doctor
1803        // message — see issue #128 for the routing rationale.
1804        Ok(PushOutcome::Error {
1805            remote_ref: remote_ref_str,
1806            message:
1807                r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
1808                    .to_owned(),
1809        })
1810    }
1811}
1812
1813#[cfg(test)]
1814mod tests {
1815    use super::*;
1816    use crate::object_store::mock::MockStore;
1817    use crate::packchain::gc::baseline_tombstone_listing_prefix;
1818
1819    const SHA: &str = "0123456789abcdef0123456789abcdef01234567";
1820    /// A second 40-char hex SHA distinct from `SHA`. Used by tests that
1821    /// need to seed pre-existing state under a SHA different from
1822    /// `local_sha` so a regression cannot pass through coincidental key
1823    /// alignment between `pre_existing` and `bundle_dest`.
1824    const OTHER_SHA: &str = "ffffffffffffffffffffffffffffffffffffffff";
1825
1826    /// Compile-time guard: replaces the runtime `assert_ne!(other_sha, SHA, …)`
1827    /// that the constant lift removed. A typo making the two consts equal
1828    /// fails the build rather than silently letting a stale-remote test
1829    /// pass for the wrong reason.
1830    const _: () = {
1831        let a = SHA.as_bytes();
1832        let b = OTHER_SHA.as_bytes();
1833        let mut i = 0;
1834        let mut differs = a.len() != b.len();
1835        while i < a.len() && i < b.len() {
1836            if a[i] != b[i] {
1837                differs = true;
1838            }
1839            i += 1;
1840        }
1841        assert!(differs, "OTHER_SHA must differ from SHA");
1842    };
1843
1844    fn rn(s: &str) -> RefName {
1845        RefName::new(s).expect("RefName")
1846    }
1847
1848    // --- parse_push_args ----------------------------------------------
1849
1850    #[test]
1851    fn parse_push_args_accepts_canonical_form() {
1852        let spec = parse_push_args("refs/heads/main:refs/heads/main").expect("parse");
1853        assert!(!spec.force);
1854        assert_eq!(spec.local_spec, "refs/heads/main");
1855        assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1856    }
1857
1858    #[test]
1859    fn parse_push_args_accepts_force_flag() {
1860        let spec = parse_push_args("+refs/heads/main:refs/heads/main").expect("parse");
1861        assert!(spec.force);
1862        assert_eq!(spec.local_spec, "refs/heads/main");
1863    }
1864
1865    #[test]
1866    fn parse_push_args_accepts_delete_form() {
1867        let spec = parse_push_args(":refs/heads/main").expect("parse");
1868        assert!(!spec.force);
1869        assert!(spec.local_spec.is_empty());
1870        assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1871    }
1872
1873    #[test]
1874    fn parse_push_args_accepts_force_delete_form() {
1875        let spec = parse_push_args("+:refs/heads/main").expect("parse");
1876        assert!(spec.force);
1877        assert!(spec.local_spec.is_empty());
1878        assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1879    }
1880
1881    #[test]
1882    fn parse_push_args_accepts_short_local() {
1883        let spec = parse_push_args("HEAD:refs/heads/main").expect("parse");
1884        assert_eq!(spec.local_spec, "HEAD");
1885    }
1886
1887    #[test]
1888    fn parse_push_args_rejects_missing_colon() {
1889        assert!(matches!(
1890            parse_push_args("refs/heads/main"),
1891            Err(PushError::Parse { .. })
1892        ));
1893    }
1894
1895    #[test]
1896    fn parse_push_args_rejects_empty_remote() {
1897        assert!(matches!(
1898            parse_push_args("refs/heads/main:"),
1899            Err(PushError::Parse { .. })
1900        ));
1901    }
1902
1903    #[test]
1904    fn parse_push_args_rejects_invalid_remote_ref() {
1905        assert!(matches!(
1906            parse_push_args("refs/heads/main:refs/heads/.bad"),
1907            Err(PushError::RemoteRef(_))
1908        ));
1909    }
1910
1911    #[test]
1912    fn parse_push_args_rejects_invalid_local_spec() {
1913        assert!(matches!(
1914            parse_push_args("refs/heads/.bad:refs/heads/main"),
1915            Err(PushError::InvalidLocalSpec(_))
1916        ));
1917    }
1918
1919    #[test]
1920    fn parse_push_args_rejects_embedded_whitespace() {
1921        assert!(matches!(
1922            parse_push_args("refs/heads/main:refs/heads/main extra"),
1923            Err(PushError::Parse { .. })
1924        ));
1925    }
1926
1927    #[test]
1928    fn parse_push_args_rejects_empty_input() {
1929        assert!(matches!(parse_push_args(""), Err(PushError::Parse { .. })));
1930    }
1931
1932    // --- key formatting -----------------------------------------------
1933
1934    #[test]
1935    fn key_formatters_with_prefix() {
1936        let r = rn("refs/heads/main");
1937        let sha = Sha::from_hex(SHA).unwrap();
1938        assert_eq!(
1939            keys::bundle_key(Some("repo"), &r, sha),
1940            format!("repo/refs/heads/main/{SHA}.bundle"),
1941        );
1942        assert_eq!(
1943            lock_key(Some("repo"), &r),
1944            "repo/refs/heads/main/LOCK#.lock"
1945        );
1946        assert_eq!(
1947            archive_key(Some("repo"), &r),
1948            "repo/refs/heads/main/repo.zip"
1949        );
1950        assert_eq!(head_key(Some("repo")), "repo/HEAD");
1951    }
1952
1953    #[test]
1954    fn key_formatters_with_no_prefix() {
1955        let r = rn("refs/heads/main");
1956        let sha = Sha::from_hex(SHA).unwrap();
1957        assert_eq!(
1958            keys::bundle_key(None, &r, sha),
1959            format!("refs/heads/main/{SHA}.bundle"),
1960        );
1961        assert_eq!(lock_key(None, &r), "refs/heads/main/LOCK#.lock");
1962        assert_eq!(archive_key(None, &r), "refs/heads/main/repo.zip");
1963        assert_eq!(head_key(None), "HEAD");
1964        // Empty-string prefix is treated identically to None.
1965        assert_eq!(head_key(Some("")), "HEAD");
1966        assert_eq!(lock_key(Some(""), &r), "refs/heads/main/LOCK#.lock");
1967    }
1968
1969    // --- bundle filter ------------------------------------------------
1970
1971    #[test]
1972    fn is_bundle_candidate_keeps_real_bundles() {
1973        assert!(is_bundle_candidate(&format!(
1974            "repo/refs/heads/main/{SHA}.bundle"
1975        )));
1976        assert!(is_bundle_candidate(&format!(
1977            "refs/heads/main/{SHA}.bundle"
1978        )));
1979    }
1980
1981    #[test]
1982    fn is_bundle_candidate_rejects_protected_zip_lock() {
1983        assert!(!is_bundle_candidate("repo/refs/heads/main/PROTECTED#"));
1984        assert!(!is_bundle_candidate("repo/refs/heads/main/repo.zip"));
1985        assert!(!is_bundle_candidate("repo/refs/heads/main/LOCK#.lock"));
1986        assert!(!is_bundle_candidate("repo/refs/heads/main/file.lock"));
1987        assert!(!is_bundle_candidate("repo/refs/heads/main/LOCKS/x"));
1988    }
1989
1990    /// Regression: refs whose names embed `.zip` as a substring must
1991    /// not be filtered out. Previously the predicate rejected any key
1992    /// containing `.zip` anywhere in the byte sequence.
1993    #[test]
1994    fn is_bundle_candidate_keeps_refs_containing_zip_substring() {
1995        assert!(is_bundle_candidate(&format!(
1996            "repo/refs/heads/v1.zip-rc1/{SHA}.bundle"
1997        )));
1998        assert!(is_bundle_candidate(&format!(
1999            "refs/heads/myrelease.zip-v1/{SHA}.bundle"
2000        )));
2001    }
2002
2003    /// Regression: refs whose names embed `LOCKS` as a substring must
2004    /// not be filtered out. Previously the predicate rejected any key
2005    /// containing `/LOCKS/` anywhere in the byte sequence.
2006    #[test]
2007    fn is_bundle_candidate_keeps_refs_containing_locks_substring() {
2008        assert!(is_bundle_candidate(&format!(
2009            "repo/refs/heads/LOCKS-feature/x/{SHA}.bundle"
2010        )));
2011        assert!(is_bundle_candidate(&format!(
2012            "refs/heads/LOCKS/sub/{SHA}.bundle"
2013        )));
2014    }
2015
2016    /// Regression: a ref-name segment ending in `.lock` is permitted by
2017    /// `gix_validate`; bundle keys under such refs must still match.
2018    /// The unwanted `.lock` sibling is `<ref>/LOCK#.lock`, where the
2019    /// final segment is `LOCK#.lock`, not `<sha>.bundle`.
2020    #[test]
2021    fn is_bundle_candidate_keeps_refs_containing_lock_substring() {
2022        assert!(is_bundle_candidate(&format!(
2023            "refs/heads/feature.lock-rc/{SHA}.bundle"
2024        )));
2025    }
2026
2027    // --- parse_remote_sha_from_key ------------------------------------
2028
2029    #[test]
2030    fn parse_remote_sha_from_key_extracts_lower_hex_40() {
2031        let sha = parse_remote_sha_from_key(&format!("repo/refs/heads/main/{SHA}.bundle"))
2032            .expect("parse");
2033        assert_eq!(sha.to_string(), SHA);
2034    }
2035
2036    #[test]
2037    fn parse_remote_sha_from_key_rejects_uppercase() {
2038        let upper = SHA.to_uppercase();
2039        assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{upper}.bundle")).is_none());
2040    }
2041
2042    #[test]
2043    fn parse_remote_sha_from_key_rejects_wrong_length() {
2044        let short = &SHA[..39];
2045        assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{short}.bundle")).is_none());
2046    }
2047
2048    #[test]
2049    fn parse_remote_sha_from_key_rejects_missing_extension() {
2050        assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{SHA}")).is_none());
2051    }
2052
2053    // --- bundles_for_ref / is_protected ------------------------------
2054
2055    #[tokio::test]
2056    async fn bundles_for_ref_filters_protected_zip_lock() {
2057        let store = MockStore::new();
2058        let r = rn("refs/heads/main");
2059        store.insert(
2060            format!("repo/refs/heads/main/{SHA}.bundle"),
2061            Bytes::from_static(b"b"),
2062        );
2063        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2064        store.insert("repo/refs/heads/main/repo.zip", Bytes::from_static(b""));
2065        store.insert("repo/refs/heads/main/LOCK#.lock", Bytes::from_static(b""));
2066        let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2067            .await
2068            .unwrap();
2069        assert_eq!(bundles.len(), 1);
2070        assert!(bundles[0].key.ends_with(".bundle"));
2071    }
2072
2073    /// Regression for #109: a ref whose name contains `.zip` must
2074    /// not have its bundle silently filtered out.
2075    #[tokio::test]
2076    async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_zip() {
2077        let store = MockStore::new();
2078        let r = rn("refs/heads/v1.zip-rc1");
2079        let bundle_key = format!("repo/refs/heads/v1.zip-rc1/{SHA}.bundle");
2080        store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
2081        let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2082            .await
2083            .unwrap();
2084        assert_eq!(bundles.len(), 1);
2085        assert_eq!(bundles[0].key, bundle_key);
2086    }
2087
2088    /// Regression for #109: a ref whose name contains `LOCKS` must
2089    /// not have its bundle silently filtered out.
2090    #[tokio::test]
2091    async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_locks() {
2092        let store = MockStore::new();
2093        let r = rn("refs/heads/LOCKS-feature/x");
2094        let bundle_key = format!("repo/refs/heads/LOCKS-feature/x/{SHA}.bundle");
2095        store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
2096        let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2097            .await
2098            .unwrap();
2099        assert_eq!(bundles.len(), 1);
2100        assert_eq!(bundles[0].key, bundle_key);
2101    }
2102
2103    /// Issue #165: a caller-supplied `cached_hidden` set must satisfy
2104    /// the tombstone lookup — `bundles_for_ref` must NOT re-list
2105    /// `<prefix>/gc/` or fetch any tombstone body. Counts every `list`
2106    /// + `get_bytes` call so a regression that drops the cache and
2107    /// re-walks the tombstone set fails this test.
2108    #[tokio::test]
2109    async fn bundles_for_ref_skips_tombstone_lookup_when_cache_provided() {
2110        use std::sync::atomic::{AtomicUsize, Ordering};
2111
2112        struct CountingStore {
2113            inner: MockStore,
2114            gc_lists: AtomicUsize,
2115            tombstone_gets: AtomicUsize,
2116        }
2117
2118        // `put_path` is intentionally omitted from the forward list to
2119        // preserve the original behavior (trait default forwarding via
2120        // `Self::put_bytes`); this decorator never has `put_path` called
2121        // on it in practice.
2122        crate::delegate_to_inner_impl! {
2123            impl ObjectStore for CountingStore {
2124                forward: get_to_file, get_bytes_range,
2125                         put_bytes, put_if_absent,
2126                         head, copy, delete;
2127
2128                async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
2129                    if prefix == "repo/gc/" {
2130                        self.gc_lists.fetch_add(1, Ordering::SeqCst);
2131                    }
2132                    self.inner.list(prefix).await
2133                }
2134
2135                async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2136                    if key.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))) {
2137                        self.tombstone_gets.fetch_add(1, Ordering::SeqCst);
2138                    }
2139                    self.inner.get_bytes(key).await
2140                }
2141            }
2142        }
2143
2144        let inner = MockStore::new();
2145        let r = rn("refs/heads/main");
2146        // Seed: a bundle for the ref plus a baseline tombstone naming a
2147        // *different* SHA. The tombstone exists so a non-cached call
2148        // would have to fetch & parse it to compute the hide set.
2149        inner.insert(
2150            format!("repo/refs/heads/main/{SHA}.bundle"),
2151            Bytes::from_static(b"b"),
2152        );
2153        // Plausible tombstone body — `tombstoned_bundle_keys` parses it
2154        // as `BaselineTombstone` JSON. Body shape mirrors the one
2155        // written by `write_baseline_tombstone_unconditional`; `v`
2156        // must be `TOMBSTONE_SCHEMA_VERSION` or the parser rejects it
2157        // and the tombstone is silently skipped (defeating the test).
2158        let tomb_body = format!(
2159            r#"{{"v":1,"ref_name":"refs/heads/main","sha":"{OTHER_SHA}","marked_at":"2024-01-01T00:00:00Z"}}"#
2160        );
2161        inner.insert(
2162            format!(
2163                "{}test.json",
2164                baseline_tombstone_listing_prefix(Some("repo"))
2165            ),
2166            Bytes::from(tomb_body),
2167        );
2168
2169        let store = CountingStore {
2170            inner,
2171            gc_lists: AtomicUsize::new(0),
2172            tombstone_gets: AtomicUsize::new(0),
2173        };
2174
2175        // Baseline: a `None` cache pays for one `gc/` list + one
2176        // tombstone GET. Asserting these counts pins the un-optimised
2177        // shape so the contrast with the cached path is meaningful.
2178        let bundles_uncached = bundles_for_ref(&store, Some("repo"), &r, None)
2179            .await
2180            .unwrap();
2181        assert_eq!(bundles_uncached.len(), 1);
2182        assert_eq!(store.gc_lists.load(Ordering::SeqCst), 1);
2183        assert_eq!(store.tombstone_gets.load(Ordering::SeqCst), 1);
2184
2185        // Cached path: hand the same hide set we already paid for.
2186        // `bundles_for_ref` must consult it and skip the `gc/` walk
2187        // entirely. A regression that drops the cache and re-lists
2188        // would bump these counters past 1.
2189        let hidden: HashSet<String> = [format!("repo/refs/heads/main/{OTHER_SHA}.bundle")]
2190            .into_iter()
2191            .collect();
2192        let bundles_cached = bundles_for_ref(&store, Some("repo"), &r, Some(&hidden))
2193            .await
2194            .unwrap();
2195        assert_eq!(bundles_cached.len(), 1);
2196        assert_eq!(
2197            store.gc_lists.load(Ordering::SeqCst),
2198            1,
2199            "cached call must not re-list gc/",
2200        );
2201        assert_eq!(
2202            store.tombstone_gets.load(Ordering::SeqCst),
2203            1,
2204            "cached call must not refetch any tombstone body",
2205        );
2206    }
2207
2208    #[tokio::test]
2209    async fn is_protected_detects_marker() {
2210        let store = MockStore::new();
2211        let r = rn("refs/heads/main");
2212        assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
2213        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2214        assert!(is_protected(&store, Some("repo"), &r).await.unwrap());
2215    }
2216
2217    /// Regression for #119: only the exact `PROTECTED#` key counts as a
2218    /// protection marker. A sibling key that merely starts with
2219    /// `PROTECTED#` (e.g. `PROTECTED#audit`) must not flip the result.
2220    #[tokio::test]
2221    async fn is_protected_ignores_protected_prefixed_sibling() {
2222        let store = MockStore::new();
2223        let r = rn("refs/heads/main");
2224        store.insert(
2225            "repo/refs/heads/main/PROTECTED#audit",
2226            Bytes::from_static(b""),
2227        );
2228        assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
2229    }
2230
2231    /// Regression for #119: `is_protected` must use `head`, not `list`.
2232    /// Arm `AccessDeniedOnAnyList`; if the implementation calls `list`
2233    /// at all, the fault fires and the call returns an error. We assert
2234    /// success and that the fault is still pending (i.e. unfired).
2235    #[tokio::test]
2236    async fn is_protected_uses_head_not_list() {
2237        use crate::object_store::mock::Fault;
2238        let store = MockStore::new();
2239        let r = rn("refs/heads/main");
2240        store.arm(Fault::AccessDeniedOnAnyList);
2241        let got = is_protected(&store, Some("repo"), &r).await.unwrap();
2242        assert!(!got);
2243        assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
2244        // And the positive case too — still no list.
2245        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2246        let got = is_protected(&store, Some("repo"), &r).await.unwrap();
2247        assert!(got);
2248        assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
2249    }
2250
2251    // --- acquire_lock / release_lock ----------------------------------
2252    //
2253    // Issue #118: `acquire_lock` returns a `LockGuard` instead of a
2254    // plain bool because the lock now carries a background heartbeat
2255    // task. Tests construct an `Arc<dyn ObjectStore>` so the heartbeat
2256    // task can clone the store; `MockStore`'s internal state is
2257    // already `Arc<Mutex<...>>`-shared, so the `Arc` clone is shape
2258    // bookkeeping, not extra state.
2259
2260    #[tokio::test]
2261    async fn acquire_lock_succeeds_when_absent() {
2262        let store = Arc::new(MockStore::new());
2263        let now = OffsetDateTime::now_utc();
2264        let guard = acquire_lock(
2265            Arc::clone(&store) as Arc<dyn ObjectStore>,
2266            "k",
2267            Duration::seconds(60),
2268            now,
2269        )
2270        .await
2271        .unwrap();
2272        assert!(guard.is_some(), "expected a fresh guard");
2273        assert!(store.contains("k"));
2274        // Drop the guard so the heartbeat task exits before the
2275        // runtime tears down (also covered explicitly by
2276        // `lock_guard_drop_aborts_heartbeat`).
2277        drop(guard);
2278    }
2279
2280    #[tokio::test]
2281    async fn acquire_lock_returns_none_when_recently_held() {
2282        let store = Arc::new(MockStore::new());
2283        let now = OffsetDateTime::now_utc();
2284        store.insert_with("k", Bytes::new(), now, PutOpts::default());
2285        let guard = acquire_lock(
2286            Arc::clone(&store) as Arc<dyn ObjectStore>,
2287            "k",
2288            Duration::seconds(60),
2289            now,
2290        )
2291        .await
2292        .unwrap();
2293        assert!(guard.is_none(), "expected contention");
2294    }
2295
2296    #[tokio::test]
2297    async fn acquire_lock_recovers_stale_lock() {
2298        let store = Arc::new(MockStore::new());
2299        let now = OffsetDateTime::now_utc();
2300        let stale = now - Duration::seconds(120);
2301        store.insert_with("k", Bytes::new(), stale, PutOpts::default());
2302        let guard = acquire_lock(
2303            Arc::clone(&store) as Arc<dyn ObjectStore>,
2304            "k",
2305            Duration::seconds(60),
2306            now,
2307        )
2308        .await
2309        .unwrap();
2310        assert!(guard.is_some(), "stale lock must be recoverable");
2311        // Lock still exists (we re-created it with put_if_absent).
2312        assert!(store.contains("k"));
2313        drop(guard);
2314    }
2315
2316    #[tokio::test]
2317    async fn acquire_lock_treats_disappeared_lock_as_contention() {
2318        // First put_if_absent says "exists", but head returns NotFound
2319        // (race: another client released between the calls). We must
2320        // surface contention, not error.
2321        use crate::object_store::mock::Fault;
2322        let store = MockStore::new();
2323        store.insert("k", Bytes::new());
2324        store.arm(Fault::NotFoundOnHead { key: "k".into() });
2325        let arc = Arc::new(store);
2326        let now = OffsetDateTime::now_utc();
2327        let guard = acquire_lock(
2328            Arc::clone(&arc) as Arc<dyn ObjectStore>,
2329            "k",
2330            Duration::seconds(60),
2331            now,
2332        )
2333        .await
2334        .unwrap();
2335        assert!(guard.is_none(), "expected contention on disappeared lock");
2336        // Confirm head() was actually called — a regression that skipped
2337        // the staleness branch and returned None directly would also
2338        // satisfy the assertion above. The fault firing proves head ran.
2339        assert_eq!(arc.pending_faults(), 0);
2340    }
2341
2342    #[tokio::test]
2343    async fn release_lock_deletes_existing_key() {
2344        let store = Arc::new(MockStore::new());
2345        let now = OffsetDateTime::now_utc();
2346        let guard = acquire_lock(
2347            Arc::clone(&store) as Arc<dyn ObjectStore>,
2348            "k",
2349            Duration::seconds(60),
2350            now,
2351        )
2352        .await
2353        .unwrap()
2354        .expect("acquire_lock must succeed on an empty store");
2355        release_lock(guard).await.unwrap();
2356        assert!(!store.contains("k"));
2357    }
2358
2359    #[tokio::test]
2360    async fn release_lock_swallows_not_found_when_lock_already_gone() {
2361        // Acquire a lock, then delete the key out-of-band before
2362        // release_lock runs. The release must map NotFound → Ok(()).
2363        let store = Arc::new(MockStore::new());
2364        let now = OffsetDateTime::now_utc();
2365        let guard = acquire_lock(
2366            Arc::clone(&store) as Arc<dyn ObjectStore>,
2367            "k",
2368            Duration::seconds(60),
2369            now,
2370        )
2371        .await
2372        .unwrap()
2373        .expect("acquire_lock must succeed");
2374        // Cancel the heartbeat first so it cannot race the manual
2375        // delete and re-create the key.
2376        guard.heartbeat.as_ref().unwrap().abort();
2377        // Give the abort a chance to take effect, then remove the key.
2378        tokio::task::yield_now().await;
2379        let _ = store.delete("k").await;
2380        release_lock(guard).await.unwrap();
2381    }
2382
2383    #[tokio::test]
2384    async fn release_lock_propagates_non_not_found_errors() {
2385        use crate::object_store::mock::Fault;
2386        let store = Arc::new(MockStore::new());
2387        let now = OffsetDateTime::now_utc();
2388        let guard = acquire_lock(
2389            Arc::clone(&store) as Arc<dyn ObjectStore>,
2390            "k",
2391            Duration::seconds(60),
2392            now,
2393        )
2394        .await
2395        .unwrap()
2396        .expect("acquire_lock must succeed");
2397        store.arm(Fault::NetworkOnDelete { key: "k".into() });
2398        let err = release_lock(guard).await.unwrap_err();
2399        assert!(
2400            matches!(err, ObjectStoreError::Network(_)),
2401            "expected Network error, got {err:?}",
2402        );
2403        // The fault fired exactly once.
2404        assert_eq!(store.pending_faults(), 0);
2405        // Key remains because the delete was faulted, not executed.
2406        assert!(store.contains("k"));
2407    }
2408
2409    /// Issue #118: a long-running critical section must not lose its
2410    /// lock. The heartbeat refreshes `last_modified` faster than the
2411    /// TTL expires, so a concurrent acquire after the original TTL
2412    /// elapses still sees a live lock and returns `None` (contention).
2413    #[tokio::test(start_paused = true)]
2414    async fn heartbeat_keeps_lock_alive_past_ttl() {
2415        let store = Arc::new(MockStore::new());
2416        let now = OffsetDateTime::now_utc();
2417        // TTL is 4 s → heartbeat fires every 2 s (see
2418        // `heartbeat_interval`). Run the test for ~10 s of virtual
2419        // time so a regression that disabled the heartbeat would have
2420        // multiple TTLs to expire under.
2421        let ttl = Duration::seconds(4);
2422        let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2423            .await
2424            .unwrap()
2425            .expect("acquire must succeed");
2426
2427        // Advance the clock past several TTLs, letting the heartbeat
2428        // task fire each time.
2429        for _ in 0..5 {
2430            tokio::time::advance(std::time::Duration::from_secs(3)).await;
2431            // Yield so the spawned heartbeat task can take its turn
2432            // on the runtime and PUT the lock key.
2433            tokio::task::yield_now().await;
2434        }
2435
2436        // A concurrent acquire would see `last_modified` recent
2437        // (heartbeat just refreshed it), so it should report
2438        // contention — not steal the lock as stale. We use a `now`
2439        // far in the future (matches the wall-clock view a second
2440        // process would have) but rely on the heartbeat having
2441        // overwritten `last_modified` to the runtime "now".
2442        let future = OffsetDateTime::now_utc();
2443        let other = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
2444            .await
2445            .unwrap();
2446        assert!(
2447            other.is_none(),
2448            "live lock must not be stealable while the holder's heartbeat runs",
2449        );
2450
2451        release_lock(guard).await.unwrap();
2452        assert!(!store.contains("k"));
2453    }
2454
2455    /// Releasing the guard must stop the heartbeat so no further PUTs
2456    /// hit the lock key after release. We assert by deleting the key
2457    /// post-release and confirming it stays gone.
2458    #[tokio::test(start_paused = true)]
2459    async fn release_lock_stops_heartbeat() {
2460        let store = Arc::new(MockStore::new());
2461        let now = OffsetDateTime::now_utc();
2462        let ttl = Duration::seconds(4);
2463        let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2464            .await
2465            .unwrap()
2466            .expect("acquire must succeed");
2467        release_lock(guard).await.unwrap();
2468        assert!(!store.contains("k"));
2469
2470        // Advance well past multiple heartbeat intervals. A
2471        // regression that forgot to abort the task would re-create
2472        // the key via put_bytes.
2473        for _ in 0..5 {
2474            tokio::time::advance(std::time::Duration::from_secs(3)).await;
2475            tokio::task::yield_now().await;
2476        }
2477        assert!(
2478            !store.contains("k"),
2479            "heartbeat must not re-create the key after release",
2480        );
2481    }
2482
2483    /// Dropping the guard without calling `release_lock` aborts the
2484    /// heartbeat (so the lock becomes stealable after TTL) and leaves
2485    /// the lock key in place (a future caller's stale-recovery path
2486    /// reclaims it).
2487    #[tokio::test(start_paused = true)]
2488    async fn lock_guard_drop_aborts_heartbeat() {
2489        let store = Arc::new(MockStore::new());
2490        let now = OffsetDateTime::now_utc();
2491        let ttl = Duration::seconds(4);
2492        let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2493            .await
2494            .unwrap()
2495            .expect("acquire must succeed");
2496        drop(guard);
2497
2498        // Capture the lock's last_modified right after drop —
2499        // heartbeats after this point would advance it. `head` is the
2500        // trait-level path to last_modified and avoids a test-only
2501        // accessor on MockStore.
2502        let after_drop = store.head("k").await.expect("lock present").last_modified;
2503
2504        // Advance time past multiple heartbeat intervals.
2505        for _ in 0..5 {
2506            tokio::time::advance(std::time::Duration::from_secs(3)).await;
2507            tokio::task::yield_now().await;
2508        }
2509
2510        let after_advance = store
2511            .head("k")
2512            .await
2513            .expect("lock still present")
2514            .last_modified;
2515        assert_eq!(
2516            after_drop, after_advance,
2517            "heartbeat must not refresh last_modified after drop",
2518        );
2519
2520        // And the lock is now stealable via the stale path: an acquire
2521        // with a `now` past TTL deletes the orphaned lock and reclaims.
2522        let future = now + Duration::seconds(120);
2523        let recovered = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
2524            .await
2525            .unwrap();
2526        assert!(recovered.is_some(), "orphaned lock must be reclaimable");
2527        drop(recovered);
2528    }
2529
2530    /// Issue #150 regression: a heartbeat `put_bytes` already in
2531    /// flight when `release` is called must complete BEFORE the
2532    /// release issues its DELETE. The pre-fix code did
2533    /// `handle.abort(); delete`, which only cancelled the future at
2534    /// its next await point — an in-flight network PUT continued on
2535    /// the server and could settle AFTER the DELETE, resurrecting the
2536    /// lock key as an orphan.
2537    ///
2538    /// The test wraps the mock store in a barrier-gated `put_bytes`
2539    /// (the first PUT records its start, then waits on a notify) so
2540    /// the heartbeat tick lands in a state where the PUT has been
2541    /// issued but not yet completed when `release` runs. The
2542    /// post-condition is the sequence of recorded operations: the
2543    /// in-flight PUT's completion must precede the DELETE's start.
2544    /// Expected sequence is derived from the invariant in the
2545    /// `LockGuard::release` doc comment (the spec), not from the
2546    /// code's current output (lesson #5).
2547    // The test body inlines a small `ObjectStore` decorator
2548    // (`GatedPutStore`) so the trait wiring inflates the line count
2549    // past clippy's default budget. Splitting the decorator into a
2550    // sibling helper would obscure the test's single behaviour
2551    // contract, so we accept the lint locally.
2552    #[allow(clippy::too_many_lines)]
2553    #[tokio::test(start_paused = true)]
2554    async fn release_awaits_in_flight_heartbeat_put_before_delete() {
2555        use std::sync::Mutex;
2556        use tokio::sync::Notify;
2557
2558        /// Op-log entry recording the relative order of `put_bytes`
2559        /// and `delete` events. The release contract requires
2560        /// `PutEnd` to precede `DeleteStart`.
2561        #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2562        enum Op {
2563            PutStart,
2564            PutEnd,
2565            DeleteStart,
2566            DeleteEnd,
2567        }
2568
2569        /// Decorator that gates the FIRST `put_bytes` on a notify
2570        /// barrier. Subsequent `put_bytes` calls pass through (they
2571        /// should not happen — once we hold the gate, release should
2572        /// stop the heartbeat before another tick fires).
2573        struct GatedPutStore {
2574            inner: Arc<MockStore>,
2575            put_gate: Arc<Notify>,
2576            log: Arc<Mutex<Vec<Op>>>,
2577            gated_key: String,
2578            gate_consumed: std::sync::atomic::AtomicBool,
2579        }
2580
2581        #[async_trait::async_trait]
2582        impl ObjectStore for GatedPutStore {
2583            async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
2584                self.inner.list(prefix).await
2585            }
2586            async fn get_to_file(
2587                &self,
2588                key: &str,
2589                dest: &std::path::Path,
2590                opts: crate::object_store::GetOpts,
2591            ) -> Result<(), ObjectStoreError> {
2592                self.inner.get_to_file(key, dest, opts).await
2593            }
2594            async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2595                self.inner.get_bytes(key).await
2596            }
2597            async fn get_bytes_range(
2598                &self,
2599                key: &str,
2600                range: std::ops::Range<u64>,
2601            ) -> Result<Bytes, ObjectStoreError> {
2602                self.inner.get_bytes_range(key, range).await
2603            }
2604            async fn put_bytes(
2605                &self,
2606                key: &str,
2607                body: Bytes,
2608                opts: PutOpts,
2609            ) -> Result<(), ObjectStoreError> {
2610                let is_gated = key == self.gated_key
2611                    && !self
2612                        .gate_consumed
2613                        .swap(true, std::sync::atomic::Ordering::SeqCst);
2614                if is_gated {
2615                    self.log.lock().unwrap().push(Op::PutStart);
2616                    self.put_gate.notified().await;
2617                }
2618                let result = self.inner.put_bytes(key, body, opts).await;
2619                if is_gated {
2620                    self.log.lock().unwrap().push(Op::PutEnd);
2621                }
2622                result
2623            }
2624            async fn put_path(
2625                &self,
2626                key: &str,
2627                src: &std::path::Path,
2628                opts: PutOpts,
2629            ) -> Result<(), ObjectStoreError> {
2630                self.inner.put_path(key, src, opts).await
2631            }
2632            async fn put_if_absent(
2633                &self,
2634                key: &str,
2635                body: Bytes,
2636            ) -> Result<bool, ObjectStoreError> {
2637                self.inner.put_if_absent(key, body).await
2638            }
2639            async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
2640                self.inner.head(key).await
2641            }
2642            async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
2643                self.inner.copy(src, dst).await
2644            }
2645            async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
2646                self.log.lock().unwrap().push(Op::DeleteStart);
2647                let result = self.inner.delete(key).await;
2648                self.log.lock().unwrap().push(Op::DeleteEnd);
2649                result
2650            }
2651        }
2652
2653        let inner = Arc::new(MockStore::new());
2654        let put_gate = Arc::new(Notify::new());
2655        let log = Arc::new(Mutex::new(Vec::<Op>::new()));
2656        let store = Arc::new(GatedPutStore {
2657            inner: Arc::clone(&inner),
2658            put_gate: Arc::clone(&put_gate),
2659            log: Arc::clone(&log),
2660            gated_key: "k".to_owned(),
2661            gate_consumed: std::sync::atomic::AtomicBool::new(false),
2662        });
2663
2664        let now = OffsetDateTime::now_utc();
2665        let ttl = Duration::seconds(4);
2666        let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2667            .await
2668            .unwrap()
2669            .expect("acquire must succeed");
2670
2671        // Yield so the heartbeat task is polled and consumes its
2672        // immediate (zero-duration) first `tick.tick()`. Without this
2673        // yield, the next `advance` would fire the immediate tick
2674        // first and then the periodic tick, but the task still
2675        // wouldn't be polled until the test yields again — we want
2676        // to be inside the loop body before advancing time.
2677        for _ in 0..4 {
2678            tokio::task::yield_now().await;
2679        }
2680
2681        // Drive one heartbeat tick. The heartbeat task issues
2682        // `put_bytes`, which the gate intercepts and parks before the
2683        // inner write completes. The interval is `ttl/3 = 1s` so any
2684        // advance >= 1s is sufficient.
2685        tokio::time::advance(std::time::Duration::from_secs(2)).await;
2686        // Yield repeatedly so the spawned heartbeat task is polled
2687        // and reaches the gate.
2688        for _ in 0..16 {
2689            tokio::task::yield_now().await;
2690            if !log.lock().unwrap().is_empty() {
2691                break;
2692            }
2693        }
2694        assert_eq!(
2695            log.lock().unwrap().as_slice(),
2696            &[Op::PutStart],
2697            "heartbeat PUT must be in flight before release fires",
2698        );
2699
2700        // Begin release on a separate task so we can observe the
2701        // ordering: release blocks on `stop_heartbeat`'s join-await,
2702        // which can only complete after the gated PUT finishes.
2703        let release_store = Arc::clone(&store);
2704        let release_handle = tokio::spawn(async move {
2705            let _ = release_store; // borrow check: keep store alive
2706            release_lock(guard).await
2707        });
2708
2709        // Let the release task get as far as it can — up to the
2710        // join-await on the heartbeat task.
2711        for _ in 0..8 {
2712            tokio::task::yield_now().await;
2713        }
2714        // No DELETE may have fired yet: the heartbeat task is still
2715        // inside the gated `put_bytes`.
2716        assert_eq!(
2717            log.lock().unwrap().as_slice(),
2718            &[Op::PutStart],
2719            "release must NOT issue DELETE while heartbeat PUT is in flight",
2720        );
2721
2722        // Open the gate: the in-flight PUT completes, the heartbeat
2723        // task exits the loop on its next shutdown re-check, and
2724        // release proceeds to DELETE.
2725        put_gate.notify_one();
2726        let result = release_handle.await.expect("release task panicked");
2727        result.expect("release_lock");
2728
2729        let final_log = log.lock().unwrap().clone();
2730        assert_eq!(
2731            final_log,
2732            vec![Op::PutStart, Op::PutEnd, Op::DeleteStart, Op::DeleteEnd],
2733            "operation order must be: heartbeat PUT completes, THEN release DELETE",
2734        );
2735        assert!(
2736            !inner.contains("k"),
2737            "lock key must be deleted after release"
2738        );
2739    }
2740
2741    // --- delete_remote_ref_under_lock ---------------------------------
2742
2743    #[tokio::test]
2744    async fn delete_remote_ref_removes_single_bundle() {
2745        let store = MockStore::new();
2746        let r = rn("refs/heads/main");
2747        store.insert(
2748            format!("repo/refs/heads/main/{SHA}.bundle"),
2749            Bytes::from_static(b"b"),
2750        );
2751        let outcome = delete_remote_ref_under_lock(
2752            &store,
2753            Some("repo"),
2754            &r,
2755            false,
2756            "repo/refs/heads/main/LOCK#.lock",
2757        )
2758        .await
2759        .unwrap();
2760        assert_eq!(
2761            outcome,
2762            PushOutcome::Ok {
2763                remote_ref: "refs/heads/main".into()
2764            }
2765        );
2766        // Prefix-empty oracle (lesson 15): no key survives under the ref
2767        // prefix after a successful delete. Stronger than `!contains(bundle)`
2768        // because a regression that wrote a tombstone or other residue
2769        // (e.g. a re-introduction of c5468b4's bundle-engine tombstone)
2770        // would also trip it.
2771        let remaining: Vec<_> = store
2772            .keys()
2773            .into_iter()
2774            .filter(|k| k.starts_with("repo/refs/heads/main/"))
2775            .collect();
2776        assert!(
2777            remaining.is_empty(),
2778            "ref prefix must be empty after delete: {remaining:?}",
2779        );
2780        // Bundle-engine delete must not write a tombstone — tombstoning
2781        // is a packchain-only deferral (#143, #203).
2782        let gc_keys: Vec<_> = store
2783            .keys()
2784            .into_iter()
2785            .filter(|k| k.starts_with("repo/gc/"))
2786            .collect();
2787        assert!(
2788            gc_keys.is_empty(),
2789            "bundle-engine delete must not write a tombstone: {gc_keys:?}",
2790        );
2791    }
2792
2793    #[tokio::test]
2794    async fn delete_remote_ref_returns_not_found_when_empty() {
2795        let store = MockStore::new();
2796        let r = rn("refs/heads/main");
2797        let outcome = delete_remote_ref_under_lock(
2798            &store,
2799            Some("repo"),
2800            &r,
2801            false,
2802            "repo/refs/heads/main/LOCK#.lock",
2803        )
2804        .await
2805        .unwrap();
2806        match outcome {
2807            PushOutcome::Error { message, .. } => {
2808                assert_eq!(message, r#""not found"?"#);
2809            }
2810            PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2811        }
2812    }
2813
2814    #[tokio::test]
2815    async fn delete_remote_ref_rejects_protected_marker() {
2816        // PROTECTED# is unfiltered for the delete-path count, but we
2817        // detect the marker before the generic multi-bundle error and
2818        // emit a protection-specific refusal that names `unprotect`.
2819        let store = MockStore::new();
2820        let r = rn("refs/heads/main");
2821        let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2822        let protected = "repo/refs/heads/main/PROTECTED#";
2823        store.insert(&bundle, Bytes::from_static(b"b"));
2824        store.insert(protected, Bytes::from_static(b""));
2825        let outcome = delete_remote_ref_under_lock(
2826            &store,
2827            Some("repo"),
2828            &r,
2829            false,
2830            "repo/refs/heads/main/LOCK#.lock",
2831        )
2832        .await
2833        .unwrap();
2834        match outcome {
2835            PushOutcome::Error { message, .. } => {
2836                assert_eq!(
2837                    message,
2838                    r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2839                );
2840            }
2841            PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2842        }
2843        // Both keys must remain — a regression that deleted on the way
2844        // to the error branch would still satisfy the message check.
2845        assert!(store.contains(&bundle));
2846        assert!(store.contains(protected));
2847    }
2848
2849    #[tokio::test]
2850    async fn delete_remote_ref_reports_corruption_without_protected_marker() {
2851        // Two bundles, no PROTECTED# marker → genuine corruption case
2852        // still falls through to the doctor message.
2853        let store = MockStore::new();
2854        let r = rn("refs/heads/main");
2855        let bundle_a = format!("repo/refs/heads/main/{SHA}.bundle");
2856        let bundle_b = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
2857        store.insert(&bundle_a, Bytes::from_static(b"a"));
2858        store.insert(&bundle_b, Bytes::from_static(b"b"));
2859        let outcome = delete_remote_ref_under_lock(
2860            &store,
2861            Some("repo"),
2862            &r,
2863            false,
2864            "repo/refs/heads/main/LOCK#.lock",
2865        )
2866        .await
2867        .unwrap();
2868        match outcome {
2869            PushOutcome::Error { message, .. } => {
2870                assert_eq!(
2871                    message,
2872                    r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#,
2873                );
2874            }
2875            PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2876        }
2877        assert!(store.contains(&bundle_a));
2878        assert!(store.contains(&bundle_b));
2879    }
2880
2881    /// Issue #128: the canonical PROTECTED# guard must reject even when
2882    /// the count happens to match `expected`. Pre-#128 this listing
2883    /// `[bundle.bundle, PROTECTED#]` matched `expected = 2` in zip mode
2884    /// and was swept silently. Pin the guard: both keys survive, and
2885    /// the wire error is the protection-specific message — not the
2886    /// generic doctor message.
2887    #[tokio::test]
2888    async fn delete_remote_ref_rejects_protected_marker_when_count_matches_zip() {
2889        let store = MockStore::new();
2890        let r = rn("refs/heads/main");
2891        let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2892        let protected = "repo/refs/heads/main/PROTECTED#";
2893        store.insert(&bundle, Bytes::from_static(b"b"));
2894        store.insert(protected, Bytes::from_static(b""));
2895        // zip = true → expected = 2, and the listing has exactly 2
2896        // entries (bundle + marker). Pre-#128 this fell through to the
2897        // count-match deletion branch.
2898        let outcome = delete_remote_ref_under_lock(
2899            &store,
2900            Some("repo"),
2901            &r,
2902            true,
2903            "repo/refs/heads/main/LOCK#.lock",
2904        )
2905        .await
2906        .unwrap();
2907        match outcome {
2908            PushOutcome::Error { message, .. } => {
2909                assert_eq!(
2910                    message,
2911                    r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2912                );
2913            }
2914            PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2915        }
2916        assert!(store.contains(&bundle), "bundle must survive");
2917        assert!(store.contains(protected), "marker must survive");
2918    }
2919
2920    /// Issue #128: the protection guard must also fire when the lone
2921    /// remaining key under the ref prefix is the PROTECTED# marker
2922    /// itself. Pre-#128, `entries == [PROTECTED#]` matched `expected = 1`
2923    /// in non-zip mode and the marker was swept — the next push to the
2924    /// ref would then succeed against an unprotected branch even though
2925    /// the operator never ran `unprotect`. Pin the guard: the marker
2926    /// survives and the wire error is the protection-specific message.
2927    #[tokio::test]
2928    async fn delete_remote_ref_rejects_protected_marker_when_only_marker_present() {
2929        let store = MockStore::new();
2930        let r = rn("refs/heads/main");
2931        let protected = "repo/refs/heads/main/PROTECTED#";
2932        store.insert(protected, Bytes::from_static(b""));
2933        let outcome = delete_remote_ref_under_lock(
2934            &store,
2935            Some("repo"),
2936            &r,
2937            false,
2938            "repo/refs/heads/main/LOCK#.lock",
2939        )
2940        .await
2941        .unwrap();
2942        match outcome {
2943            PushOutcome::Error { message, .. } => {
2944                assert_eq!(
2945                    message,
2946                    r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2947                );
2948            }
2949            PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2950        }
2951        assert!(store.contains(protected), "marker must survive");
2952    }
2953
2954    /// Issue #128: simulate the TOCTOU sequence the bug describes.
2955    /// Client A starts a delete; between `acquire_lock` and the
2956    /// under-lock listing, a marker appears at PROTECTED#. After #159
2957    /// `protect` itself acquires the same lock and cannot race here,
2958    /// but the under-lock listing remains the canonical guard against
2959    /// any other source of a same-key marker (a lock-bypass bug, a
2960    /// non-cooperating client). The fresh under-lock listing reflects
2961    /// the marker, so the canonical guard rejects the delete. Both the
2962    /// bundle and the marker survive.
2963    #[tokio::test]
2964    async fn delete_remote_ref_rejects_protect_landed_between_acquire_and_list() {
2965        let store = MockStore::new();
2966        let r = rn("refs/heads/main");
2967        let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2968        let lock_key = "repo/refs/heads/main/LOCK#.lock";
2969        let protected = "repo/refs/heads/main/PROTECTED#";
2970        // Client A acquired the lock; bundle was already present.
2971        store.insert(&bundle, Bytes::from_static(b"b"));
2972        store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
2973        // A PROTECTED# marker appears AFTER our lock-acquire but BEFORE
2974        // our under-lock listing — exactly the window the pre-#128
2975        // ordering left open. Post-#159 `protect` cannot reach this
2976        // window itself, but the under-lock listing must still catch
2977        // markers from any other source (lock-bypass bug, non-cooperating
2978        // client).
2979        store.insert(protected, Bytes::from_static(b""));
2980
2981        let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
2982            .await
2983            .unwrap();
2984
2985        match outcome {
2986            PushOutcome::Error { message, .. } => {
2987                assert_eq!(
2988                    message,
2989                    r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2990                );
2991            }
2992            PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2993        }
2994        assert!(store.contains(&bundle), "bundle must survive");
2995        assert!(store.contains(protected), "marker must survive");
2996        assert!(store.contains(lock_key), "held lock must survive");
2997    }
2998
2999    #[tokio::test]
3000    async fn delete_remote_ref_zip_mode_expects_two_keys() {
3001        let store = MockStore::new();
3002        let r = rn("refs/heads/main");
3003        let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
3004        let zip = "repo/refs/heads/main/repo.zip";
3005        store.insert(&bundle, Bytes::from_static(b"b"));
3006        store.insert(zip, Bytes::from_static(b""));
3007        let outcome = delete_remote_ref_under_lock(
3008            &store,
3009            Some("repo"),
3010            &r,
3011            true,
3012            "repo/refs/heads/main/LOCK#.lock",
3013        )
3014        .await
3015        .unwrap();
3016        assert_eq!(
3017            outcome,
3018            PushOutcome::Ok {
3019                remote_ref: "refs/heads/main".into()
3020            }
3021        );
3022        assert!(!store.contains(&bundle));
3023        assert!(!store.contains(zip));
3024    }
3025
3026    // --- PushOutcome rendering ----------------------------------------
3027
3028    #[test]
3029    fn push_outcome_renders_ok_line() {
3030        let line = PushOutcome::Ok {
3031            remote_ref: "refs/heads/main".into(),
3032        }
3033        .to_protocol_line();
3034        assert_eq!(line, "ok refs/heads/main\n");
3035    }
3036
3037    #[test]
3038    fn push_outcome_renders_error_line() {
3039        let line = PushOutcome::Error {
3040            remote_ref: "refs/heads/main".into(),
3041            message: r#""bad"?"#.into(),
3042        }
3043        .to_protocol_line();
3044        assert_eq!(line, "error refs/heads/main \"bad\"?\n");
3045    }
3046
3047    /// Both duplicate-bundle paths (pre-lock at ~line 482 and under-lock
3048    /// at ~line 600) must produce wire output ending in `"?\n`. The `?`
3049    /// suffix is the project-wide Rust convention for `error <ref> "..."`
3050    /// messages — git treats `"..."?` as recoverable and `"..."` as
3051    /// fatal. Both branches normalize to the recoverable form.
3052    #[test]
3053    fn duplicate_bundle_errors_use_consistent_wire_format() {
3054        let pre_lock_line = PushOutcome::Error {
3055            remote_ref: "refs/heads/main".into(),
3056            message:
3057                r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
3058                    .to_owned(),
3059        }
3060        .to_protocol_line();
3061        let under_lock_line = PushOutcome::Error {
3062            remote_ref: "refs/heads/main".into(),
3063            message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
3064        }
3065        .to_protocol_line();
3066
3067        assert_eq!(
3068            pre_lock_line,
3069            "error refs/heads/main \"multiple bundles exist on server. \
3070             Run git-remote-object-store doctor to fix.\"?\n",
3071        );
3072        assert_eq!(
3073            under_lock_line,
3074            "error refs/heads/main \"multiple bundles exist for the same ref on server. \
3075             Run git-remote-object-store doctor to fix.\"?\n",
3076        );
3077        assert!(pre_lock_line.ends_with("\"?\n"));
3078        assert!(under_lock_line.ends_with("\"?\n"));
3079    }
3080
3081    // --- lock_ttl_from_env --------------------------------------------
3082
3083    #[test]
3084    fn lock_ttl_env_override_falls_back_for_unset_invalid_or_zero() {
3085        // Group all env-var cases in one test fn so they share a single
3086        // `EnvGuard` and its per-key lock — the var is process-global,
3087        // and the guard serialises against `manage::doctor`'s
3088        // env-touching test that pokes the same key. Drop restores the
3089        // prior value on every exit path, including assertion panics.
3090        let env = crate::test_util::EnvGuard::take(ENV_LOCK_TTL_SECONDS);
3091        let default_ttl = Duration::seconds(i64::try_from(DEFAULT_LOCK_TTL_SECONDS).unwrap());
3092        // Unset returns default.
3093        env.clear();
3094        assert_eq!(lock_ttl_from_env(), default_ttl);
3095        // `None` and `Some(0)` (issue #208) defer to env-or-default.
3096        assert_eq!(
3097            resolve_lock_ttl_seconds(None),
3098            DEFAULT_LOCK_TTL_SECONDS,
3099            "None must defer to env-or-default",
3100        );
3101        assert_eq!(
3102            resolve_lock_ttl_seconds(Some(0)),
3103            DEFAULT_LOCK_TTL_SECONDS,
3104            "Some(0) must not defeat per-ref locking (issue #208)",
3105        );
3106        // Non-numeric falls back.
3107        env.set_to("not-a-number");
3108        assert_eq!(lock_ttl_from_env(), default_ttl);
3109        // Zero falls back (would defeat per-ref locking).
3110        env.set_to("0");
3111        assert_eq!(lock_ttl_from_env(), default_ttl);
3112        // Positive integer wins.
3113        env.set_to("120");
3114        assert_eq!(lock_ttl_from_env(), Duration::seconds(120));
3115        // With env set, `None` and `Some(0)` honour the env override —
3116        // an operator's env var must still take effect when a CLI
3117        // consumer accidentally passes the wrong default.
3118        assert_eq!(
3119            resolve_lock_ttl_seconds(None),
3120            120,
3121            "None must honour env override",
3122        );
3123        assert_eq!(
3124            resolve_lock_ttl_seconds(Some(0)),
3125            120,
3126            "Some(0) must honour env override",
3127        );
3128    }
3129
3130    // --- saturating_duration_seconds (issue #221) ---------------------
3131
3132    #[test]
3133    fn saturating_duration_seconds_caps_at_i64_max() {
3134        // `u64::MAX` exceeds `i64::MAX` — the helper must saturate at
3135        // `i64::MAX` rather than panic on the `try_from`. This is the
3136        // ~292-billion-year sentinel ceiling shared by all TTL paths.
3137        assert_eq!(
3138            saturating_duration_seconds(u64::MAX),
3139            Duration::seconds(i64::MAX),
3140        );
3141    }
3142
3143    #[test]
3144    fn saturating_duration_seconds_passes_normal_value() {
3145        assert_eq!(saturating_duration_seconds(60), Duration::seconds(60));
3146    }
3147
3148    // --- resolve_lock_ttl_seconds (issue #208) ------------------------
3149    //
3150    // `Compact::run_into` used to accept `Some(0)` and feed it straight
3151    // into the engine, bypassing the `lock_ttl_from_env` zero-clamp
3152    // from #112 and defeating per-ref locking. The shared resolver
3153    // collapses both `None` and `Some(0)` onto the env-or-default path
3154    // so the lock-acquiring call site cannot re-introduce the footgun.
3155    // `Doctor::resolved_lock_ttl_seconds` deliberately does NOT route
3156    // through this resolver — doctor only compares lock ages and never
3157    // acquires a lock, so an operator-explicit `Some(0)` is a valid
3158    // "treat every lock as stale" request and is honoured.
3159
3160    #[test]
3161    fn resolve_lock_ttl_some_positive_returns_unchanged() {
3162        // Positive values bypass the env entirely and so are safe to
3163        // assert in parallel with the env-touching test below. Cover
3164        // the smallest valid value, a normal value, and the u64 ceiling
3165        // — the ceiling documents the deliberate decision to not impose
3166        // an upper bound: downstream `time::Duration::seconds` saturates
3167        // safely at `i64::MAX`.
3168        assert_eq!(resolve_lock_ttl_seconds(Some(1)), 1);
3169        assert_eq!(resolve_lock_ttl_seconds(Some(120)), 120);
3170        assert_eq!(resolve_lock_ttl_seconds(Some(u64::MAX)), u64::MAX);
3171    }
3172
3173    // --- FORMAT key write via perform_push_under_lock --------------------
3174
3175    /// Helper: run `perform_push_under_lock` against a temporary bundle file
3176    /// so we can assert on the resulting store state.
3177    async fn push_under_lock_with_bundle(
3178        store: &MockStore,
3179        prefix: Option<&str>,
3180        engine: StorageEngine,
3181    ) -> PushOutcome {
3182        let r = rn("refs/heads/main");
3183        let temp_dir = tempfile::Builder::new()
3184            .prefix("test_push_")
3185            .tempdir()
3186            .unwrap();
3187        let bundle_path = temp_dir.path().join("bundle");
3188        std::fs::write(&bundle_path, b"fake bundle").unwrap();
3189
3190        let state = PushReadyState {
3191            remote_ref: r,
3192            local_sha: Sha::from_hex(SHA).unwrap(),
3193            pre_existing: None,
3194            bundle_path,
3195            zip_artifacts: None,
3196            engine,
3197            force: false,
3198            pre_existing_was_ancestor: true,
3199            local_spec: "refs/heads/main".to_owned(),
3200            hidden_bundles: HashSet::new(),
3201            _temp_dir: temp_dir,
3202        };
3203
3204        perform_push_under_lock(store, prefix, BackendKind::S3, state)
3205            .await
3206            .unwrap()
3207    }
3208
3209    #[tokio::test]
3210    async fn perform_push_under_lock_writes_format_key_on_first_push() {
3211        let store = MockStore::new();
3212        let outcome =
3213            push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
3214        assert!(
3215            matches!(outcome, PushOutcome::Ok { .. }),
3216            "expected Ok outcome"
3217        );
3218        assert!(
3219            store.contains("repo/FORMAT"),
3220            "FORMAT key must be written on the first push",
3221        );
3222        let content = store.get_bytes("repo/FORMAT").await.unwrap();
3223        assert_eq!(content.as_ref(), b"bundle");
3224    }
3225
3226    #[tokio::test]
3227    async fn perform_push_under_lock_writes_format_key_without_prefix() {
3228        let store = MockStore::new();
3229        let outcome = push_under_lock_with_bundle(&store, None, StorageEngine::Bundle).await;
3230        assert!(
3231            matches!(outcome, PushOutcome::Ok { .. }),
3232            "expected Ok outcome"
3233        );
3234        assert!(
3235            store.contains("FORMAT"),
3236            "FORMAT key must be written at root when no prefix",
3237        );
3238        let content = store.get_bytes("FORMAT").await.unwrap();
3239        assert_eq!(content.as_ref(), b"bundle");
3240    }
3241
3242    #[tokio::test]
3243    async fn perform_push_under_lock_format_key_is_idempotent() {
3244        // If FORMAT already exists (second push), put_if_absent is a no-op.
3245        // Pre-insert with a trailing newline so the original bytes differ from
3246        // what the push would write — a plain `put` would overwrite to
3247        // b"bundle", while put_if_absent must preserve b"bundle\n".
3248        let store = MockStore::new();
3249        store.insert("repo/FORMAT", Bytes::from_static(b"bundle\n"));
3250        let outcome =
3251            push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
3252        assert!(
3253            matches!(outcome, PushOutcome::Ok { .. }),
3254            "expected Ok outcome"
3255        );
3256        // Original content preserved — put_if_absent did not overwrite.
3257        let content = store.get_bytes("repo/FORMAT").await.unwrap();
3258        assert_eq!(content.as_ref(), b"bundle\n");
3259    }
3260
3261    // --- stale-remote guard -------------------------------------------
3262
3263    /// Build a `PushReadyState` with a specific `pre_existing` key and a
3264    /// matching `bundle_path` on disk. The store is left in whatever state
3265    /// the caller configured (e.g. with a pre-seeded bundle key); the
3266    /// caller already knows which prefix it seeded under, so this helper
3267    /// does not need to take it.
3268    fn push_state_with_pre_existing(pre_existing: Option<String>) -> PushReadyState {
3269        let r = rn("refs/heads/main");
3270        let temp_dir = tempfile::Builder::new()
3271            .prefix("test_push_")
3272            .tempdir()
3273            .unwrap();
3274        let bundle_path = temp_dir.path().join("bundle");
3275        std::fs::write(&bundle_path, b"fake bundle").unwrap();
3276        PushReadyState {
3277            remote_ref: r,
3278            local_sha: Sha::from_hex(SHA).unwrap(),
3279            pre_existing,
3280            bundle_path,
3281            zip_artifacts: None,
3282            engine: StorageEngine::Bundle,
3283            force: false,
3284            pre_existing_was_ancestor: true,
3285            local_spec: "refs/heads/main".to_owned(),
3286            hidden_bundles: HashSet::new(),
3287            _temp_dir: temp_dir,
3288        }
3289    }
3290
3291    /// `pre_existing=None`, `current_key=Some(...)`: a concurrent push
3292    /// created a bundle after our pre-lock list but before we acquired
3293    /// the lock.
3294    ///
3295    /// The seeded `existing_key` deliberately uses a SHA distinct from
3296    /// the local push's `local_sha` (= `SHA`). If they matched, a
3297    /// regression that compared `current_key` against `bundle_dest`
3298    /// (the key derived from `local_sha`) instead of against
3299    /// `pre_existing` could pass for the wrong reason — the keys
3300    /// happen to align. With distinct SHAs the only way the test
3301    /// passes is the correct comparison: `pre_existing(None) !=
3302    /// current_key(Some)`.
3303    #[tokio::test]
3304    async fn perform_push_under_lock_rejects_none_to_some_stale_remote() {
3305        let store = MockStore::new();
3306        let existing_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3307        store.insert(&existing_key, Bytes::from_static(b"old bundle"));
3308        let state = push_state_with_pre_existing(None);
3309        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3310            .await
3311            .unwrap();
3312        assert!(
3313            matches!(
3314                &outcome,
3315                PushOutcome::Error { message, .. }
3316                    if message == r#""stale remote. Please fetch and retry."?"#
3317            ),
3318            "expected stale-remote error, got {outcome:?}",
3319        );
3320    }
3321
3322    /// `pre_existing=Some(key)`, `current_key=None`: the bundle was
3323    /// deleted between our pre-lock list and the lock acquisition (e.g.
3324    /// a concurrent `git push :<ref>` delete).
3325    #[tokio::test]
3326    async fn perform_push_under_lock_rejects_some_to_none_stale_remote() {
3327        let store = MockStore::new();
3328        let old_key = format!("repo/refs/heads/main/{SHA}.bundle");
3329        let state = push_state_with_pre_existing(Some(old_key.clone()));
3330        // Store is empty — the previously-seen bundle is gone.
3331        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3332            .await
3333            .unwrap();
3334        assert!(
3335            matches!(
3336                &outcome,
3337                PushOutcome::Error { message, .. }
3338                    if message == r#""stale remote. Please fetch and retry."?"#
3339            ),
3340            "expected stale-remote error, got {outcome:?}",
3341        );
3342    }
3343
3344    /// `pre_existing=Some(key_a)`, `current_key=Some(key_b)` where
3345    /// `key_a != key_b`: a concurrent push replaced our bundle.
3346    #[tokio::test]
3347    async fn perform_push_under_lock_rejects_replaced_bundle_stale_remote() {
3348        let store = MockStore::new();
3349        let old_sha = SHA;
3350        let new_sha = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
3351        let old_key = format!("repo/refs/heads/main/{old_sha}.bundle");
3352        let new_key = format!("repo/refs/heads/main/{new_sha}.bundle");
3353        // Under-lock the store shows the *new* key.
3354        store.insert(&new_key, Bytes::from_static(b"new bundle"));
3355        let state = push_state_with_pre_existing(Some(old_key));
3356        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3357            .await
3358            .unwrap();
3359        assert!(
3360            matches!(
3361                &outcome,
3362                PushOutcome::Error { message, .. }
3363                    if message == r#""stale remote. Please fetch and retry."?"#
3364            ),
3365            "expected stale-remote error, got {outcome:?}",
3366        );
3367    }
3368
3369    /// Under-lock re-listing sees two bundles for the same ref (two clients
3370    /// raced and both uploaded before either acquired the lock). Must return
3371    /// the under-lock duplicate-bundle error before reaching the stale-remote
3372    /// guard or the upload.
3373    #[tokio::test]
3374    async fn perform_push_under_lock_rejects_two_bundles_seen_under_lock() {
3375        let store = MockStore::new();
3376        let sha_a = "1111111111111111111111111111111111111111";
3377        let sha_b = "2222222222222222222222222222222222222222";
3378        store.insert(
3379            format!("repo/refs/heads/main/{sha_a}.bundle"),
3380            Bytes::from_static(b"bundle_a"),
3381        );
3382        store.insert(
3383            format!("repo/refs/heads/main/{sha_b}.bundle"),
3384            Bytes::from_static(b"bundle_b"),
3385        );
3386        // Pre-lock snapshot saw zero bundles; under-lock sees two.
3387        let state = push_state_with_pre_existing(None);
3388        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3389            .await
3390            .unwrap();
3391        assert!(
3392            matches!(
3393                &outcome,
3394                PushOutcome::Error { message, .. }
3395                    if message == r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#
3396            ),
3397            "expected under-lock multi-bundle error, got {outcome:?}",
3398        );
3399        // Neither bundle was overwritten or deleted.
3400        assert!(store.contains(&format!("repo/refs/heads/main/{sha_a}.bundle")));
3401        assert!(store.contains(&format!("repo/refs/heads/main/{sha_b}.bundle")));
3402    }
3403
3404    /// Stale-remote happy path: `pre_existing == current_key`. The four
3405    /// drift scenarios above all assert that mismatch produces an error.
3406    /// This case asserts that the *match* path goes through to a normal
3407    /// `Ok(remote_ref)` push outcome — without it, a regression that
3408    /// flipped the comparison to always-mismatch would pass every drift
3409    /// test and silently break every real push.
3410    ///
3411    /// `pre_existing` is seeded under SHA distinct from `local_sha`
3412    /// (mirroring the `rejects_none_to_some` hardening): so a buggy
3413    /// regression that compared `current_key` against `bundle_dest`
3414    /// (the key derived from `local_sha`) instead of against
3415    /// `pre_existing` cannot pass for the wrong reason — the keys
3416    /// are deliberately different. The push must:
3417    ///   1. produce `Ok(remote_ref)`
3418    ///   2. upload the new bundle at `bundle_dest` (SHA = `local_sha`)
3419    ///   3. write a baseline tombstone naming `OTHER_SHA` (issue #157
3420    ///      defers the prior-bundle delete to `gc sweep`) and leave
3421    ///      the prior bundle in place until grace expires
3422    ///   4. write `HEAD` and `FORMAT`
3423    #[tokio::test]
3424    async fn perform_push_under_lock_passes_through_when_pre_existing_matches_current() {
3425        let store = MockStore::new();
3426        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3427        // Seed under `OTHER_SHA` so under-lock list returns this key.
3428        // The local push's `local_sha` is `SHA`, so `bundle_dest` is a
3429        // DIFFERENT key. The stale-remote check passes (pre_existing ==
3430        // current_key, both = pre_key); the function then uploads the
3431        // new bundle at `bundle_dest` and writes a baseline tombstone
3432        // naming the old bundle's SHA.
3433        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3434        let state = push_state_with_pre_existing(Some(pre_key.clone()));
3435        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3436            .await
3437            .unwrap();
3438        assert!(
3439            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3440            "expected Ok(refs/heads/main), got {outcome:?}",
3441        );
3442        // The new bundle must land at the bundle_dest derived from
3443        // local_sha, not at pre_key.
3444        let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3445        let new_bytes = store
3446            .get_bytes(&bundle_dest)
3447            .await
3448            .expect("new bundle must be uploaded at bundle_dest");
3449        assert_eq!(
3450            new_bytes.as_ref(),
3451            b"fake bundle",
3452            "new bundle must contain the local payload",
3453        );
3454        // Issue #157: the old bundle must remain readable — fetchers
3455        // that advertised it via an earlier `list` need the grace
3456        // window. `gc sweep` will reclaim it later.
3457        assert!(
3458            store.contains(&pre_key),
3459            "old bundle at pre_key must survive the push (deferred via tombstone)",
3460        );
3461        // A single baseline tombstone naming the prior SHA must exist.
3462        let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3463        let metas = store.list("repo/gc/").await.unwrap();
3464        let tombstones: Vec<_> = metas
3465            .iter()
3466            .filter(|m| m.key.starts_with(&tomb_listing))
3467            .collect();
3468        assert_eq!(
3469            tombstones.len(),
3470            1,
3471            "exactly one baseline tombstone must be written; got keys: {:?}",
3472            tombstones.iter().map(|m| &m.key).collect::<Vec<_>>(),
3473        );
3474        let body = store.get_bytes(&tombstones[0].key).await.unwrap();
3475        let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
3476        assert_eq!(
3477            parsed["ref_name"].as_str(),
3478            Some("refs/heads/main"),
3479            "tombstone must name the pushed ref",
3480        );
3481        assert_eq!(
3482            parsed["sha"].as_str(),
3483            Some(OTHER_SHA),
3484            "tombstone must name the prior SHA so `gc sweep` reclaims OTHER_SHA.bundle",
3485        );
3486        assert!(
3487            store.contains("repo/FORMAT"),
3488            "FORMAT key must be written by the push",
3489        );
3490        assert!(
3491            store.contains("repo/HEAD"),
3492            "HEAD key must be written by the push",
3493        );
3494    }
3495
3496    /// Issue #121 + #157 regression: the prior-bundle cleanup must
3497    /// never fail the push. The new bundle is already durable;
3498    /// reporting failure misrepresents the remote state. Match the
3499    /// `compact` / `force_push_baseline_cleanup` best-effort contract:
3500    /// log at warn and report success.
3501    ///
3502    /// Under issue #157 the preferred cleanup path is a baseline
3503    /// tombstone (deferred reclamation). When the tombstone PUT
3504    /// itself fails, the fallback synchronous delete runs — so this
3505    /// test arms a fault on BOTH the tombstone PUT and the prior
3506    /// bundle's delete to exercise the worst-case orphan path. The
3507    /// two-bundle state remains on the bucket so the next push's
3508    /// under-lock multi-bundle guard surfaces it to the operator.
3509    #[tokio::test]
3510    async fn perform_push_under_lock_succeeds_when_prior_cleanup_fails() {
3511        use crate::object_store::mock::Fault;
3512        let store = MockStore::new();
3513        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3514        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3515        // Fail the tombstone PUT (any key under the baseline-tomb
3516        // namespace) AND the fallback synchronous delete of the
3517        // prior bundle. The push must still report Ok.
3518        store.arm(Fault::NetworkOnPutBytesPrefix {
3519            prefix: baseline_tombstone_listing_prefix(Some("repo")),
3520        });
3521        store.arm(Fault::NetworkOnDelete {
3522            key: pre_key.clone(),
3523        });
3524
3525        let state = push_state_with_pre_existing(Some(pre_key.clone()));
3526        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3527            .await
3528            .expect("push must succeed even when prior-bundle cleanup fails");
3529        assert!(
3530            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3531            "expected Ok(refs/heads/main), got {outcome:?}",
3532        );
3533
3534        let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3535        assert!(
3536            store.contains(&bundle_dest),
3537            "new bundle must be uploaded at bundle_dest",
3538        );
3539        // Both faults fired: tombstone PUT first, fallback delete
3540        // second. None should remain pending.
3541        assert_eq!(store.pending_faults(), 0);
3542        // Orphan remains on the bucket — operator sees the warn log and
3543        // the next push's multi-bundle guard will direct them to doctor.
3544        assert!(
3545            store.contains(&pre_key),
3546            "cleanup faults must leave the prior bundle in place",
3547        );
3548        // No tombstone was written (the PUT failed); the operator path
3549        // is the multi-bundle guard, not deferred reclamation.
3550        let metas = store.list("repo/gc/").await.unwrap();
3551        let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3552        assert!(
3553            !metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
3554            "no baseline tombstone must remain after a failed PUT",
3555        );
3556    }
3557
3558    /// Issue #157 fallback path: when the baseline tombstone PUT fails
3559    /// but the synchronous delete succeeds, the prior bundle is
3560    /// reclaimed immediately. This is the recovery shape that
3561    /// preserves the issue #121 "two-bundle state surfaces via doctor"
3562    /// invariant — without the fallback delete, a tombstone PUT
3563    /// failure would orphan the prior bundle indefinitely (gc sweep
3564    /// has no tombstone to act on).
3565    #[tokio::test]
3566    async fn perform_push_under_lock_falls_back_to_sync_delete_on_tombstone_put_failure() {
3567        use crate::object_store::mock::Fault;
3568        let store = MockStore::new();
3569        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3570        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3571        // Fail only the tombstone PUT — the prior-bundle delete is
3572        // left armable-free so the fallback path completes.
3573        store.arm(Fault::NetworkOnPutBytesPrefix {
3574            prefix: baseline_tombstone_listing_prefix(Some("repo")),
3575        });
3576
3577        let state = push_state_with_pre_existing(Some(pre_key.clone()));
3578        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3579            .await
3580            .expect("push must succeed when tombstone PUT fails but fallback delete succeeds");
3581        assert!(
3582            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3583            "expected Ok(refs/heads/main), got {outcome:?}",
3584        );
3585
3586        // The fault on the tombstone PUT fired; no tombstone remains.
3587        assert_eq!(store.pending_faults(), 0);
3588        let metas = store.list("repo/gc/").await.unwrap();
3589        let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3590        assert!(
3591            !metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
3592            "tombstone PUT failed, so no baseline-tomb key may exist",
3593        );
3594        // The fallback synchronous delete succeeded — prior bundle gone.
3595        assert!(
3596            !store.contains(&pre_key),
3597            "fallback synchronous delete must reclaim the prior bundle",
3598        );
3599        // The new bundle is durable.
3600        let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3601        assert!(
3602            store.contains(&bundle_dest),
3603            "new bundle must be uploaded at bundle_dest",
3604        );
3605    }
3606
3607    /// Issue #127 regression: a non-`NotFound` error on the optional
3608    /// zip artifact upload after the new bundle has been put must NOT
3609    /// fail the push. The bundle, `HEAD`, and `FORMAT` are already
3610    /// durable; reporting failure misrepresents the remote state. The
3611    /// zip is a CodePipeline-side convenience surface — bundle
3612    /// availability is what determines whether `git clone`/`fetch`
3613    /// work. Match the `delete_prior_bundle_best_effort` contract: log
3614    /// at warn and report success.
3615    #[tokio::test]
3616    async fn perform_push_under_lock_succeeds_when_zip_upload_fails() {
3617        use crate::object_store::mock::Fault;
3618        let store = MockStore::new();
3619
3620        // Build a PushReadyState with zip_artifacts present so the
3621        // zip-upload block runs. The archive file is written to disk
3622        // so a future refactor that re-orders the fault check vs. the
3623        // file read still has a real file to act on.
3624        let r = rn("refs/heads/main");
3625        let temp_dir = tempfile::Builder::new()
3626            .prefix("test_push_")
3627            .tempdir()
3628            .unwrap();
3629        let bundle_path = temp_dir.path().join("bundle");
3630        std::fs::write(&bundle_path, b"fake bundle").unwrap();
3631
3632        let archive_tempdir = tempfile::Builder::new()
3633            .prefix("test_zip_")
3634            .tempdir()
3635            .unwrap();
3636        let archive_path = archive_tempdir.path().join("repo.zip");
3637        std::fs::write(&archive_path, b"fake zip body").unwrap();
3638
3639        let state = PushReadyState {
3640            remote_ref: r,
3641            local_sha: Sha::from_hex(SHA).unwrap(),
3642            pre_existing: None,
3643            bundle_path,
3644            zip_artifacts: Some(ZipArtifacts {
3645                archive_path,
3646                short_sha: "deadbeef".to_owned(),
3647                commit_msg: "test commit".to_owned(),
3648                _tempdir: archive_tempdir,
3649            }),
3650            engine: StorageEngine::Bundle,
3651            force: false,
3652            pre_existing_was_ancestor: true,
3653            local_spec: "refs/heads/main".to_owned(),
3654            hidden_bundles: HashSet::new(),
3655            _temp_dir: temp_dir,
3656        };
3657
3658        let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
3659        store.arm(Fault::NetworkOnPutPath {
3660            key: zip_dest.clone(),
3661        });
3662
3663        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3664            .await
3665            .expect("push must succeed even when zip upload fails");
3666        assert!(
3667            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3668            "expected Ok(refs/heads/main), got {outcome:?}",
3669        );
3670
3671        // The bundle reached the bucket — that is the git-protocol
3672        // contract for a successful push.
3673        let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3674        assert!(
3675            store.contains(&bundle_dest),
3676            "new bundle must be uploaded at bundle_dest",
3677        );
3678        // The zip fault fired exactly once — proves put_path was
3679        // attempted and failed.
3680        assert_eq!(store.pending_faults(), 0);
3681        // The zip key is absent — proves the failure was not silently
3682        // swallowed by a retry that masked the regression.
3683        assert!(
3684            !store.contains(&zip_dest),
3685            "zip key must be absent when the upload fault fires",
3686        );
3687    }
3688
3689    /// Issue #161: on S3 the zip-artifact upload must carry the
3690    /// `codepipeline-artifact-revision-summary` user-metadata header so
3691    /// AWS `CodePipeline` can consume the commit summary. On Azure the
3692    /// same hyphenated key is rejected by the service (metadata names
3693    /// must be valid C# identifiers), and the issue #127 swallow path
3694    /// then hides the upload failure — silently dropping every zip
3695    /// artifact. The fix only emits the metadata on S3; this pair of
3696    /// tests pins both halves of the contract against `MockStore`,
3697    /// which records `user_metadata` verbatim and lets us inspect it
3698    /// without standing up a live backend.
3699    #[tokio::test]
3700    async fn perform_push_under_lock_emits_codepipeline_metadata_on_s3() {
3701        let (store, zip_dest) = run_zip_push(BackendKind::S3).await;
3702        let meta = store.metadata(&zip_dest).expect("zip stored");
3703        let summary = meta
3704            .user_metadata
3705            .iter()
3706            .find(|(k, _)| k == "codepipeline-artifact-revision-summary")
3707            .expect("S3 push must attach the CodePipeline revision-summary metadata");
3708        assert_eq!(summary.1, "test commit");
3709    }
3710
3711    #[tokio::test]
3712    async fn perform_push_under_lock_omits_codepipeline_metadata_on_azure() {
3713        let (store, zip_dest) = run_zip_push(BackendKind::Azure).await;
3714        let meta = store.metadata(&zip_dest).expect("zip stored");
3715        assert!(
3716            meta.user_metadata.is_empty(),
3717            "Azure push must not attach hyphenated CodePipeline metadata; \
3718             got {entries:?}",
3719            entries = meta.user_metadata,
3720        );
3721    }
3722
3723    /// Drive a single `?zip=1` push through `perform_push_under_lock` for
3724    /// the given backend kind, returning the store and the zip key so
3725    /// each test can assert on `user_metadata` independently. Centralised
3726    /// here so an accidental drift between the S3 and Azure variants
3727    /// (different `commit_msg`, different prefix, different ref) cannot
3728    /// hide a regression in the metadata wiring.
3729    async fn run_zip_push(kind: BackendKind) -> (MockStore, String) {
3730        let store = MockStore::new();
3731        let r = rn("refs/heads/main");
3732        let temp_dir = tempfile::Builder::new()
3733            .prefix("test_push_")
3734            .tempdir()
3735            .unwrap();
3736        let bundle_path = temp_dir.path().join("bundle");
3737        std::fs::write(&bundle_path, b"fake bundle").unwrap();
3738
3739        let archive_tempdir = tempfile::Builder::new()
3740            .prefix("test_zip_")
3741            .tempdir()
3742            .unwrap();
3743        let archive_path = archive_tempdir.path().join("repo.zip");
3744        std::fs::write(&archive_path, b"fake zip body").unwrap();
3745
3746        let state = PushReadyState {
3747            remote_ref: r,
3748            local_sha: Sha::from_hex(SHA).unwrap(),
3749            pre_existing: None,
3750            bundle_path,
3751            zip_artifacts: Some(ZipArtifacts {
3752                archive_path,
3753                short_sha: "deadbeef".to_owned(),
3754                commit_msg: "test commit".to_owned(),
3755                _tempdir: archive_tempdir,
3756            }),
3757            engine: StorageEngine::Bundle,
3758            force: false,
3759            pre_existing_was_ancestor: true,
3760            local_spec: "refs/heads/main".to_owned(),
3761            hidden_bundles: HashSet::new(),
3762            _temp_dir: temp_dir,
3763        };
3764
3765        let outcome = perform_push_under_lock(&store, Some("repo"), kind, state)
3766            .await
3767            .expect("push must succeed");
3768        assert!(matches!(outcome, PushOutcome::Ok { .. }));
3769        let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
3770        assert!(
3771            store.contains(&zip_dest),
3772            "zip artifact must land on bucket"
3773        );
3774        (store, zip_dest)
3775    }
3776
3777    // --- delete_remote_ref_under_lock ---------------------------------
3778
3779    /// Issue #133: under the lock, a ref whose only remaining object is
3780    /// the lock key itself reports `"not found"?` rather than the
3781    /// pre-#133 quirk of treating the lock as a bundle. In production
3782    /// the lock here is the one [`acquire_lock`] holds across the call;
3783    /// `release_lock` deletes it after this function returns, so the
3784    /// caller never sees a dangling lock either way.
3785    ///
3786    /// This pins the new contract: the filter on the lock key must
3787    /// short-circuit to `"not found"?` when the lock is the only
3788    /// listed entry, NOT delete it as a bundle.
3789    #[tokio::test]
3790    async fn delete_remote_ref_under_lock_reports_not_found_when_only_lock_present() {
3791        let store = MockStore::new();
3792        let lock_key = "repo/refs/heads/main/LOCK#.lock";
3793        // Simulate the lock we hold across the call (the production
3794        // caller has already acquired it via `acquire_lock`).
3795        store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
3796        let r = rn("refs/heads/main");
3797
3798        let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
3799            .await
3800            .unwrap();
3801
3802        match outcome {
3803            PushOutcome::Error { message, .. } => {
3804                assert_eq!(message, r#""not found"?"#);
3805            }
3806            PushOutcome::Ok { .. } => panic!("expected Error, got Ok"),
3807        }
3808        // The lock key is NOT swept by `delete_remote_ref_under_lock` —
3809        // `release_lock` is responsible for removing it. Pin that here
3810        // so a regression that swept the held lock would fail.
3811        assert!(
3812            store.contains(lock_key),
3813            "delete_remote_ref_under_lock must NOT delete the held lock key",
3814        );
3815    }
3816
3817    /// Issue #133: when a concurrent push lands a NEW bundle between
3818    /// the lock-acquire and our listing inside the lock, the post-lock
3819    /// listing reflects the new bundle. The delete proceeds normally
3820    /// against that bundle (and the lock is filtered out). This pins
3821    /// the close of the race window the issue describes: the listing
3822    /// is now under the lock, so the deletion target is whatever the
3823    /// concurrent writer left behind, not a stale pre-lock snapshot.
3824    #[tokio::test]
3825    async fn delete_remote_ref_under_lock_sweeps_concurrently_landed_bundle() {
3826        let store = MockStore::new();
3827        let r = rn("refs/heads/main");
3828        let lock_key = "repo/refs/heads/main/LOCK#.lock";
3829        // Concurrent push landed this bundle; we hold the lock now.
3830        let bundle = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3831        store.insert(&bundle, Bytes::from_static(b"new"));
3832        store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
3833
3834        let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
3835            .await
3836            .unwrap();
3837
3838        assert_eq!(
3839            outcome,
3840            PushOutcome::Ok {
3841                remote_ref: "refs/heads/main".into()
3842            }
3843        );
3844        assert!(
3845            !store.contains(&bundle),
3846            "concurrently-landed bundle must be swept by the under-lock listing",
3847        );
3848        assert!(
3849            store.contains(lock_key),
3850            "held lock must survive the sweep (release_lock removes it)",
3851        );
3852    }
3853
3854    /// Stale lock is deleted but another client re-acquires it before our
3855    /// retry `put_if_absent`. Must return `Ok(None)` — the caller maps
3856    /// this to a "lock held" user error, not a hard failure.
3857    #[tokio::test]
3858    async fn acquire_lock_stale_retry_loses_second_race() {
3859        use crate::object_store::mock::Fault;
3860        let store = MockStore::new();
3861        let now = OffsetDateTime::now_utc();
3862        let stale = now - Duration::seconds(120);
3863        store.insert_with("k", Bytes::new(), stale, PutOpts::default());
3864        // Another client wins the race between our delete and retry.
3865        store.arm(Fault::ContendedPutIfAbsent { key: "k".into() });
3866        let arc = Arc::new(store);
3867        let guard = acquire_lock(
3868            Arc::clone(&arc) as Arc<dyn ObjectStore>,
3869            "k",
3870            Duration::seconds(60),
3871            now,
3872        )
3873        .await
3874        .unwrap();
3875        assert!(guard.is_none(), "expected contention on the retry race");
3876        // Fault fired — confirms the retry put_if_absent was called.
3877        assert_eq!(arc.pending_faults(), 0);
3878        // The stale lock was removed; no key remains.
3879        assert!(!arc.contains("k"));
3880    }
3881
3882    // --- full_error_chain dedup --------------------------------------
3883
3884    /// Tripwire for the dedup-by-suffix fix: a naive chain-walk on
3885    /// `PushError::Store(ObjectStoreError::Network(_))` produces a
3886    /// duplicated tail because both `PushError::Store` and
3887    /// `ObjectStoreError::Network` inline their immediate source via
3888    /// `{0}` in the `Display` derive. The shared
3889    /// `super::append_source_chain` helper skips levels whose text is
3890    /// already at the tail of the message. A regression that
3891    /// re-introduced the always-append walk would render
3892    /// `"…network error: dns failure: dns failure"` (or even longer
3893    /// for deeper chains), failing this byte-exact assertion.
3894    #[test]
3895    fn full_error_chain_deduplicates_inlined_source_text() {
3896        let inner: crate::object_store::BoxError = Box::new(std::io::Error::other("dns failure"));
3897        let err = PushError::Store(ObjectStoreError::Network(inner));
3898        let rendered = full_error_chain(&err);
3899        assert_eq!(
3900            rendered, "object-store error during push: network error: dns failure",
3901            "PushError::Store(Network(_)) must not duplicate the inner source",
3902        );
3903    }
3904
3905    // --- bundle progress sink wiring (issue #55) -----------------------
3906
3907    /// Decorator around `MockStore` that records, for every `put_path`
3908    /// call, whether `opts.progress` was `Some`. Used to pin the
3909    /// "bundle uploads attach a progress sink" contract from issue
3910    /// #55: a regression that drops the sink from `perform_push_under_lock`
3911    /// would silently regress the only thing `git push` users have to
3912    /// watch a multi-GiB transfer.
3913    ///
3914    /// The decorator forwards every other method to the inner
3915    /// `MockStore` unchanged. Wrapping for the assertion-of-interest
3916    /// alone keeps the test tightly scoped — there is no fault
3917    /// injection, no chunking knob, just a Vec of "did `put_path` get
3918    /// a sink?" booleans keyed by the destination key.
3919    #[derive(Default)]
3920    struct RecordingPutPathStore {
3921        inner: MockStore,
3922        put_path_progress_seen: std::sync::Mutex<Vec<(String, bool)>>,
3923    }
3924
3925    impl RecordingPutPathStore {
3926        fn observed(&self) -> Vec<(String, bool)> {
3927            self.put_path_progress_seen
3928                .lock()
3929                .expect("observation lock")
3930                .clone()
3931        }
3932    }
3933
3934    #[async_trait::async_trait]
3935    impl ObjectStore for RecordingPutPathStore {
3936        async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
3937            self.inner.list(prefix).await
3938        }
3939        async fn get_to_file(
3940            &self,
3941            key: &str,
3942            dest: &Path,
3943            opts: crate::object_store::GetOpts,
3944        ) -> Result<(), ObjectStoreError> {
3945            self.inner.get_to_file(key, dest, opts).await
3946        }
3947        async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
3948            self.inner.get_bytes(key).await
3949        }
3950        async fn get_bytes_range(
3951            &self,
3952            key: &str,
3953            range: std::ops::Range<u64>,
3954        ) -> Result<Bytes, ObjectStoreError> {
3955            self.inner.get_bytes_range(key, range).await
3956        }
3957        async fn put_bytes(
3958            &self,
3959            key: &str,
3960            body: Bytes,
3961            opts: PutOpts,
3962        ) -> Result<(), ObjectStoreError> {
3963            self.inner.put_bytes(key, body, opts).await
3964        }
3965        async fn put_path(
3966            &self,
3967            key: &str,
3968            src: &Path,
3969            opts: PutOpts,
3970        ) -> Result<(), ObjectStoreError> {
3971            self.put_path_progress_seen
3972                .lock()
3973                .expect("observation lock")
3974                .push((key.to_owned(), opts.progress.is_some()));
3975            self.inner.put_path(key, src, opts).await
3976        }
3977        async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
3978            self.inner.put_if_absent(key, body).await
3979        }
3980        async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
3981            self.inner.head(key).await
3982        }
3983        async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
3984            self.inner.copy(src, dst).await
3985        }
3986        async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
3987            self.inner.delete(key).await
3988        }
3989    }
3990
3991    /// `perform_push_under_lock` must attach a `ProgressSink` to the
3992    /// bundle `put_path` so `git push` users see motion during a slow
3993    /// upload (issue #55). Before the fix, this call passed
3994    /// `PutOpts::default()` and the user saw nothing for hours on a
3995    /// 20 GiB push.
3996    #[tokio::test]
3997    async fn perform_push_under_lock_attaches_progress_sink_to_bundle_put_path() {
3998        let store = RecordingPutPathStore::default();
3999        let r = rn("refs/heads/main");
4000        let temp_dir = tempfile::Builder::new()
4001            .prefix("test_push_progress_")
4002            .tempdir()
4003            .unwrap();
4004        let bundle_path = temp_dir.path().join("bundle");
4005        std::fs::write(&bundle_path, b"fake bundle").unwrap();
4006        let state = PushReadyState {
4007            remote_ref: r,
4008            local_sha: Sha::from_hex(SHA).unwrap(),
4009            pre_existing: None,
4010            bundle_path,
4011            zip_artifacts: None,
4012            engine: StorageEngine::Bundle,
4013            force: false,
4014            pre_existing_was_ancestor: true,
4015            local_spec: "refs/heads/main".to_owned(),
4016            hidden_bundles: HashSet::new(),
4017            _temp_dir: temp_dir,
4018        };
4019        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4020            .await
4021            .unwrap();
4022        assert!(
4023            matches!(outcome, PushOutcome::Ok { .. }),
4024            "expected Ok outcome",
4025        );
4026        let observed = store.observed();
4027        // Exactly one `put_path` call (no zip artifacts in this fixture).
4028        // Pinning ≥1 rather than ==1 guards against future test
4029        // refactors that legitimately add another upload — the contract
4030        // we care about is that *every* bundle-class upload carries a
4031        // sink.
4032        assert!(
4033            !observed.is_empty(),
4034            "perform_push_under_lock must call put_path for the bundle",
4035        );
4036        for (key, has_sink) in &observed {
4037            assert!(
4038                has_sink,
4039                "put_path for `{key}` must carry a ProgressSink (issue #55)",
4040            );
4041        }
4042    }
4043
4044    /// `bundle_progress_sink` accepts an arbitrary stream of
4045    /// `report(amount)` calls without panicking and tolerates `None`
4046    /// for `total` (the stat-failed fallback). Pins the sink's
4047    /// public-shape contract: a regression that switched the
4048    /// `Option<u64>` to a non-optional `u64` would force every caller
4049    /// to handle stat errors, breaking the "graceful degradation"
4050    /// note in the helper's doc comment.
4051    #[test]
4052    fn bundle_progress_sink_accepts_reports_without_panicking() {
4053        let with_total = bundle_progress_sink("repo/bundle.bundle", Some(1_024));
4054        with_total.report(256);
4055        with_total.report(256);
4056        with_total.report(512);
4057
4058        let without_total = bundle_progress_sink("repo/bundle.bundle", None);
4059        without_total.report(1);
4060        without_total.report(u64::MAX); // saturates rather than wraps
4061    }
4062
4063    /// Pin the not-ancestor wire token at the Rust level. The shellspec
4064    /// suites (`spec/integration/{s3,az}/force_push_spec.sh`,
4065    /// `spec/live/s3/force_push_spec.sh`) assert on the literal
4066    /// substring `"not ancestor"` — but they only run via `make
4067    /// shellspec-*` targets, not the default `cargo test --workspace`
4068    /// gate. This test catches a Rust dev who renames the constant
4069    /// without updating the spec files.
4070    #[test]
4071    fn not_ancestor_token_value_is_stable() {
4072        assert_eq!(
4073            NOT_ANCESTOR_TOKEN, "not ancestor",
4074            "spec/{{integration,live}}/*/force_push_spec.sh asserts on this exact substring",
4075        );
4076        let formatted = format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of refs/heads/main."?"#);
4077        assert!(
4078            formatted.contains(NOT_ANCESTOR_TOKEN),
4079            "the not-ancestor PushOutcome::Error message must embed the token literally; got {formatted:?}",
4080        );
4081    }
4082
4083    /// CRLF in a commit-message summary would split the
4084    /// `x-amz-meta-codepipeline-artifact-revision-summary` (or
4085    /// `x-ms-meta-…`) header on the wire, letting a forged commit
4086    /// inject arbitrary user metadata onto the uploaded zip
4087    /// archive. `sanitize_metadata_value` must collapse every ASCII
4088    /// control byte (CR, LF, NUL, HTAB, …) to a space so the
4089    /// resulting header value is single-line and free of injection
4090    /// payloads.
4091    #[test]
4092    fn sanitize_metadata_value_strips_control_chars() {
4093        assert_eq!(
4094            sanitize_metadata_value("hello\r\nX-Injected: yes"),
4095            "hello  X-Injected: yes",
4096        );
4097        assert_eq!(sanitize_metadata_value("nul\0byte"), "nul byte");
4098        assert_eq!(sanitize_metadata_value("plain text"), "plain text");
4099        assert_eq!(
4100            sanitize_metadata_value("café — short summary"),
4101            "café — short summary",
4102            "non-ASCII printable characters must pass through unchanged",
4103        );
4104        assert_eq!(sanitize_metadata_value(""), "");
4105    }
4106
4107    // --- Issue #129: force-push protection check runs under the lock ---
4108
4109    /// Regression for issue #129. If a concurrent `protect` lands a
4110    /// `PROTECTED#` marker between the pre-lock work in `prepare_push`
4111    /// and the lock acquisition in `push_one`, the under-lock arm of
4112    /// `perform_push_under_lock` must observe it and reject a non-FF
4113    /// force-push with the same `NotAncestor` wire token the pre-lock
4114    /// non-force probe would have produced.
4115    ///
4116    /// `pre_existing_was_ancestor = false` represents "this force-push
4117    /// is NOT a fast-forward" — exactly the case the historical
4118    /// protection-demotion logic rejected. The `PROTECTED#` key is
4119    /// seeded after the would-be pre-lock check (we simulate that by
4120    /// constructing the state with `force=true` and seeding the marker
4121    /// alongside the matching pre-existing bundle).
4122    #[tokio::test]
4123    async fn perform_push_under_lock_rejects_force_when_protected_under_lock_and_not_ff() {
4124        let store = MockStore::new();
4125        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4126        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4127        // Concurrent `protect` lands the marker before we get the lock.
4128        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4129        let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4130        state.force = true;
4131        state.pre_existing_was_ancestor = false;
4132        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4133            .await
4134            .unwrap();
4135        assert!(
4136            matches!(
4137                &outcome,
4138                PushOutcome::Error { message, .. }
4139                    if message == r#""remote ref is not ancestor of refs/heads/main."?"#
4140            ),
4141            "expected under-lock NotAncestor refusal, got {outcome:?}",
4142        );
4143        // The protection marker must survive — the engine never touches
4144        // protect/unprotect state. A regression that swept it on the
4145        // refusal path would silently downgrade a server's protection.
4146        assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
4147        // The pre-existing bundle must survive intact: a refusal path
4148        // that erroneously progressed past the protection check could
4149        // overwrite or delete it.
4150        let local_sha = SHA;
4151        assert!(store.contains(&pre_key));
4152        assert!(
4153            !store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
4154            "refused push must not upload the new bundle",
4155        );
4156    }
4157
4158    /// Companion to the rejection case: when the user's local tip IS a
4159    /// fast-forward of the pre-existing remote bundle (`pre_existing_was_ancestor
4160    /// = true`), the historical "protected ref + force" semantic is to
4161    /// proceed — protection only blocks non-fast-forward force-pushes.
4162    /// This test pins that branch: a `PROTECTED#` marker under the
4163    /// lock plus a FF push must still succeed.
4164    #[tokio::test]
4165    async fn perform_push_under_lock_allows_force_when_protected_under_lock_but_ff() {
4166        let store = MockStore::new();
4167        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4168        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4169        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4170        let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4171        state.force = true;
4172        state.pre_existing_was_ancestor = true; // FF case.
4173        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4174            .await
4175            .unwrap();
4176        assert!(
4177            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4178            "FF push must pass even when protected, got {outcome:?}",
4179        );
4180        // Protection marker still in place after the push.
4181        assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
4182    }
4183
4184    /// Companion to the rejection case: a legitimate force-push (force,
4185    /// non-FF, no `PROTECTED#` marker) must proceed. This pins the
4186    /// polarity of the AND-clause guarding the protection rejection — a
4187    /// regression that dropped the `is_protected` check from the
4188    /// condition would refuse every non-FF force-push, not just those
4189    /// against protected refs.
4190    #[tokio::test]
4191    async fn perform_push_under_lock_allows_force_when_not_ancestor_and_not_protected() {
4192        let store = MockStore::new();
4193        let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4194        store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4195        // No PROTECTED# marker.
4196        let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4197        state.force = true;
4198        state.pre_existing_was_ancestor = false; // non-FF force-push.
4199        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4200            .await
4201            .unwrap();
4202        assert!(
4203            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4204            "legitimate non-FF force-push must proceed, got {outcome:?}",
4205        );
4206        // A passing push uploads the new bundle keyed by `local_sha`.
4207        let local_sha = SHA;
4208        assert!(
4209            store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
4210            "new bundle must be uploaded on a successful force-push",
4211        );
4212        // Defensive: the NotAncestor wire token must NOT appear anywhere
4213        // in the outcome — that would mean the guard mis-fired.
4214        if let PushOutcome::Error { message, .. } = &outcome {
4215            assert!(
4216                !message.contains("not ancestor"),
4217                "force-push without protection must not emit NotAncestor: {message}",
4218            );
4219        }
4220    }
4221
4222    /// A non-force push (`force=false`) must never consult `is_protected`
4223    /// under the lock: non-FF non-force pushes were already rejected
4224    /// pre-lock by the ancestry probe, and FF non-force pushes are
4225    /// unaffected by protection. The under-lock check is gated on
4226    /// `force` precisely so this round-trip costs zero extra HEAD calls.
4227    ///
4228    /// Test-design note: `pre_existing_was_ancestor` is forced to `false`
4229    /// here so that ONLY the `force` clause of the under-lock guard
4230    /// (`force && !pre_existing_was_ancestor && is_protected(...)`)
4231    /// keeps the protection check off. A regression that drops the
4232    /// `force &&` clause would flip the guard to true and fail this
4233    /// test; if `pre_existing_was_ancestor` were left as `true`, the
4234    /// `!pre_existing_was_ancestor` clause would short-circuit and
4235    /// hide that regression. The `pre_existing` bundle key is omitted
4236    /// to keep the FF-vs-non-FF semantics consistent with "no prior
4237    /// remote SHA, therefore not an ancestor".
4238    #[tokio::test]
4239    async fn perform_push_under_lock_skips_protection_check_for_non_force() {
4240        let store = MockStore::new();
4241        // Marker present, but a non-force push must not even probe for it.
4242        store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4243        let mut state = push_state_with_pre_existing(None);
4244        state.force = false;
4245        state.pre_existing_was_ancestor = false;
4246        let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4247            .await
4248            .unwrap();
4249        assert!(
4250            matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4251            "non-force push must pass regardless of protection: {outcome:?}",
4252        );
4253    }
4254
4255    // -----------------------------------------------------------------
4256    // Issue #151 — bundle-engine delete must not miss a `PROTECTED#`
4257    // marker written after the under-lock listing. The primary defence
4258    // is the per-ref lock (#159 made `protect`/`unprotect` acquire the
4259    // same key the delete holds). These tests pin the post-sweep
4260    // defensive verification (`verify_no_orphan_protected_after_delete`)
4261    // behaves correctly on the happy path and is silent there.
4262    // -----------------------------------------------------------------
4263
4264    #[tokio::test]
4265    async fn issue_151_clean_delete_passes_post_sweep_verification() {
4266        // Happy path with the lock contract honoured: bundle present,
4267        // no marker, lock held. The sweep deletes the bundle and the
4268        // post-sweep `head(PROTECTED#)` returns NotFound — silently —
4269        // and the delete reports `ok`. A regression that promoted the
4270        // post-sweep probe into a hard error would surface here.
4271        let store = MockStore::new();
4272        let r = rn("refs/heads/main");
4273        let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
4274        let lock_key = "repo/refs/heads/main/LOCK#.lock";
4275        store.insert(&bundle, Bytes::from_static(b"b"));
4276        store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
4277
4278        let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
4279            .await
4280            .unwrap();
4281        assert_eq!(
4282            outcome,
4283            PushOutcome::Ok {
4284                remote_ref: "refs/heads/main".into()
4285            },
4286            "clean delete must report ok after the post-sweep probe",
4287        );
4288        assert!(!store.contains(&bundle), "bundle must be swept");
4289        assert!(
4290            store.contains(lock_key),
4291            "lock survives the sweep (release removes it)",
4292        );
4293    }
4294
4295    /// Helper unit test for [`verify_no_orphan_protected_after_delete`]:
4296    /// the helper itself must not error or panic when the marker is
4297    /// absent, and the call must be cheap (a single `head`). Pinned
4298    /// here so a future refactor of the helper cannot regress its
4299    /// contract — the delete paths rely on this being a no-op on
4300    /// the happy path.
4301    #[tokio::test]
4302    async fn verify_no_orphan_protected_after_delete_is_noop_when_marker_absent() {
4303        use crate::object_store::mock::Fault;
4304        let store = MockStore::new();
4305        let r = rn("refs/heads/main");
4306        // Arm a one-shot transient HEAD fault on the marker key. The
4307        // helper MUST issue a `head()` on that key; the fault is the
4308        // witness. A regression that turned the helper into a literal
4309        // no-op would leave the fault unconsumed and fail the
4310        // pending-faults assertion below. Without this witness, the
4311        // "marker absent → marker absent" assertion is vacuous.
4312        store.arm(Fault::NetworkOnHead {
4313            key: "repo/refs/heads/main/PROTECTED#".to_owned(),
4314        });
4315        verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
4316        assert_eq!(
4317            store.pending_faults(),
4318            0,
4319            "helper must call head() on the marker key — fault unconsumed",
4320        );
4321        // The transient HEAD error goes through the `debug!` branch and
4322        // the helper returns silently; the bucket stays unchanged.
4323        assert!(
4324            !store.contains("repo/refs/heads/main/PROTECTED#"),
4325            "helper must not touch the bucket",
4326        );
4327    }
4328
4329    /// When the helper observes a marker — the lock-contract-violation
4330    /// branch — it logs at `error!` and returns silently. The bucket
4331    /// state must be unchanged (no rollback). This is the
4332    /// belt-and-suspenders branch the issue's race scenario would
4333    /// reach if a future regression bypassed the lock.
4334    #[tokio::test]
4335    async fn verify_no_orphan_protected_after_delete_does_not_mutate_when_marker_present() {
4336        let store = MockStore::new();
4337        let r = rn("refs/heads/main");
4338        let marker = "repo/refs/heads/main/PROTECTED#";
4339        store.insert(marker, Bytes::new());
4340        verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
4341        // The helper logs but does NOT delete the marker: rollback is
4342        // not the helper's job (the delete is already complete; the
4343        // operator-visible "ref is gone" outcome stands; the orphan
4344        // marker is surveillance telemetry).
4345        assert!(
4346            store.contains(marker),
4347            "helper must not delete the orphan marker — surveillance only",
4348        );
4349    }
4350}