git_remote_object_store/protocol/push.rs
1//! `push` handler with per-ref locking via conditional writes.
2//!
3//! Sequential-batch semantics: every `push <refspec>` line in a batch
4//! is processed in order under its own per-ref lock, and one outcome
5//! line is emitted per push (`ok <ref>\n` or `error <ref> "msg"\n`).
6//! `gix::Repository` is `!Sync` so the handler holds the repo handle
7//! on a single task — pushes never run in parallel within one client.
8//!
9//! Stdout discipline: this module returns [`PushOutcome`] values and
10//! never writes to the protocol stream itself. The REPL renders each
11//! outcome and the trailing blank-line terminator (see
12//! `.claude/rules/protocol-stdout.md`).
13
14use std::collections::HashSet;
15use std::env;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19
20use bytes::Bytes;
21use time::{Duration, OffsetDateTime};
22use tokio::sync::watch;
23use tokio::task::JoinHandle;
24use tracing::{debug, info, warn};
25
26use crate::git::{self, GitError, RefName, RefNameError, Sha, ShaError, is_valid_ref_name};
27use crate::keys;
28use crate::object_store::{ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts};
29use crate::packchain::gc::{tombstoned_bundle_keys, write_baseline_tombstone_best_effort};
30use crate::packchain::schema::Sha40;
31use crate::url::{BackendKind, StorageEngine};
32
33/// Default per-ref lock TTL (seconds) when [`ENV_LOCK_TTL_SECONDS`] is
34/// unset or unparseable. Single source of truth for the lock TTL —
35/// `manage::doctor`'s stale-lock predicate, the CLI's
36/// `--lock-ttl-seconds` default (resolved through [`lock_ttl_from_env`]
37/// when the flag is unset), and integration-test wire-format pinning
38/// all consume this constant (the management surface re-exports it
39/// from [`crate::manage::DEFAULT_LOCK_TTL_SECONDS`]) so the views of
40/// "stale" cannot drift silently.
41pub const DEFAULT_LOCK_TTL_SECONDS: u64 = 60;
42
43/// Push configuration that is constant across an entire batch.
44///
45/// Bundles the `zip`, `engine`, and `ttl` parameters so that [`push_one`]
46/// and [`perform_push_under_lock`] stay within the argument-count budget.
47struct PushConfig {
48 /// Whether to upload `repo.zip` alongside each bundle.
49 zip: bool,
50 /// Storage engine to lock into the `FORMAT` key on the first push.
51 engine: StorageEngine,
52 /// Per-ref lock TTL.
53 ttl: Duration,
54 /// Backend kind of the destination. Used by the zip-artifact path to
55 /// decide whether to attach the `codepipeline-artifact-revision-summary`
56 /// user-metadata header — that header is meaningful only on S3
57 /// (AWS `CodePipeline` consumes it) and the hyphenated key is invalid on
58 /// Azure, where it would otherwise cause the entire zip upload to fail
59 /// silently under the issue #127 best-effort swallow contract.
60 /// See issue #161.
61 kind: BackendKind,
62}
63
64/// Environment override for the lock TTL, in seconds.
65pub(crate) const ENV_LOCK_TTL_SECONDS: &str = "GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS";
66
67/// Stable substring embedded in the rejection message returned when the
68/// remote ref is not an ancestor of the pushed local ref. Treated as a
69/// user-facing contract: shellspec suites assert on this token to
70/// distinguish the ancestor-mismatch failure mode from unrelated push
71/// failures (network, permission, malformed URL, ...). Reword the
72/// surrounding sentence freely; do not change this token without
73/// updating every call site, including the spec files under
74/// `spec/integration/*/force_push_spec.sh` and
75/// `spec/live/*/force_push_spec.sh`.
76pub(crate) const NOT_ANCESTOR_TOKEN: &str = "not ancestor";
77
78/// Build the canonical wire-format rejection message returned when a
79/// push is refused because the remote ref is not an ancestor of
80/// `local_spec`. Centralising the template here keeps the bundle and
81/// packchain engines in lockstep — they previously inlined byte-identical
82/// `format!` calls, and silent drift between them would have produced
83/// engine-dependent wire output the spec suites could not match against
84/// [`NOT_ANCESTOR_TOKEN`].
85pub(crate) fn not_ancestor_wire_message(local_spec: &str) -> String {
86 format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of {local_spec}."?"#)
87}
88
89/// Canonical wire-format message returned when a delete is refused
90/// because a `PROTECTED#` marker is present under the ref. Shared by
91/// the bundle engine ([`delete_remote_ref_under_lock`]) and the
92/// packchain engine ([`crate::packchain`]'s `delete_remote_ref_packchain`)
93/// so both surface identical bytes to git/clients. A single source of
94/// truth here avoids the duplicate-literal drift that a `const _` byte
95/// equality guard previously had to defend against.
96pub(crate) const DELETE_PROTECTION_MESSAGE: &str = r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#;
97
98/// Errors surfaced by the push path. These abort the helper — per-ref
99/// failures (multi-bundle, ancestor mismatch, lock contention, ...) are
100/// returned as [`PushOutcome::Error`] without aborting the batch.
101#[derive(Debug, thiserror::Error)]
102pub enum PushError {
103 /// `push <refspec>` line could not be parsed.
104 #[error("invalid push command {line:?}: expected `[+]<src>:<dst>`")]
105 Parse {
106 /// The offending line payload (after the `push ` prefix).
107 line: String,
108 },
109
110 /// Local rev-spec failed permissive ref-name validation.
111 #[error("invalid local ref-spec: {0:?}")]
112 InvalidLocalSpec(String),
113
114 /// Remote ref name is malformed.
115 #[error("invalid remote ref: {0}")]
116 RemoteRef(#[from] RefNameError),
117
118 /// SHA hex extracted from a stored bundle key was malformed.
119 #[error("invalid SHA in bundle key: {0}")]
120 Sha(#[from] ShaError),
121
122 /// Object-store transport / auth failure.
123 #[error("object-store error during push: {0}")]
124 Store(#[from] ObjectStoreError),
125
126 /// Local git operation failed (rev-parse, bundle, archive).
127 #[error("git error during push: {0}")]
128 Git(#[from] GitError),
129
130 /// Local I/O failure (tempdir, file read).
131 #[error("local I/O error during push: {0}")]
132 Io(#[from] std::io::Error),
133
134 /// Packchain-engine-specific failure surfaced by
135 /// [`crate::packchain::push::push_batch`]. Wrapped here so the
136 /// per-ref `error <ref>` arm in [`push_batch`] can render a
137 /// uniform wire line regardless of which engine produced the
138 /// error.
139 #[error("packchain engine error during push: {0}")]
140 Packchain(#[from] crate::packchain::PackchainError),
141}
142
143/// Result of a single push within a batch. Rendered to stdout by the REPL
144/// as either `ok <ref>\n` or `error <ref> <msg>\n`.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub enum PushOutcome {
147 /// Push succeeded. `remote_ref` echoes back to git so it can mark
148 /// the local ref as updated.
149 Ok {
150 /// The remote ref that was pushed (unparsed wire form).
151 remote_ref: String,
152 },
153 /// Push was rejected. `message` is the free-form reason rendered
154 /// after the ref name on the wire.
155 Error {
156 /// The remote ref the rejection applies to.
157 remote_ref: String,
158 /// Human-readable rejection reason.
159 message: String,
160 },
161}
162
163impl PushOutcome {
164 /// Format `self` as the single line emitted on stdout (terminator
165 /// included).
166 #[must_use]
167 pub(crate) fn to_protocol_line(&self) -> String {
168 match self {
169 Self::Ok { remote_ref } => format!("ok {remote_ref}\n"),
170 Self::Error {
171 remote_ref,
172 message,
173 } => format!("error {remote_ref} {message}\n"),
174 }
175 }
176}
177
178/// Parsed `push` command line.
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub(crate) struct PushSpec {
181 /// `+` was present — the user requested a force push.
182 pub(crate) force: bool,
183 /// User-supplied local rev-spec. Empty means "delete the remote ref".
184 pub(crate) local_spec: String,
185 /// Strict, fully-qualified remote ref.
186 pub(crate) remote_ref: RefName,
187}
188
189/// Parse the payload of a `push <refspec>` line (the bytes after the
190/// `push ` prefix have already been stripped by the REPL).
191pub(crate) fn parse_push_args(args: &str) -> Result<PushSpec, PushError> {
192 let parse_err = || PushError::Parse {
193 line: args.to_owned(),
194 };
195 if args.is_empty() || args.contains(' ') {
196 return Err(parse_err());
197 }
198 let (local, remote) = args.split_once(':').ok_or_else(parse_err)?;
199 if remote.is_empty() {
200 return Err(parse_err());
201 }
202 let (force, local) = match local.strip_prefix('+') {
203 Some(rest) => (true, rest),
204 None => (false, local),
205 };
206 if !local.is_empty() && !is_valid_ref_name(local) {
207 return Err(PushError::InvalidLocalSpec(local.to_owned()));
208 }
209 let remote_ref = RefName::new(remote)?;
210 Ok(PushSpec {
211 force,
212 local_spec: local.to_owned(),
213 remote_ref,
214 })
215}
216
217/// Build the `<prefix>/<ref>/` listing prefix used by lock and bundle
218/// listings. Empty / absent prefix collapses to a bare `<ref>/`.
219///
220/// Thin typed wrapper over [`keys::ref_listing_prefix`] that takes the
221/// validated `RefName` newtype so helper-protocol call sites can't pass
222/// an unvalidated string. The shared helper is the single allocation
223/// point — `manage::doctor` and `manage::branch` reach it directly with
224/// already-validated `&str` ref paths.
225pub(crate) fn ref_listing_prefix(prefix: Option<&str>, remote_ref: &RefName) -> String {
226 keys::ref_listing_prefix(prefix, remote_ref.as_str())
227}
228
229/// Build the lock key: `<prefix>/<ref>/LOCK#.lock`.
230pub(crate) fn lock_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
231 format!("{}LOCK#.lock", ref_listing_prefix(prefix, remote_ref))
232}
233
234/// Build the zip-archive key: `<prefix>/<ref>/repo.zip`.
235fn archive_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
236 format!("{}repo.zip", ref_listing_prefix(prefix, remote_ref))
237}
238
239/// Build the HEAD key: `<prefix>/HEAD` (no slash when prefix is absent).
240pub(crate) fn head_key(prefix: Option<&str>) -> String {
241 keys::join(prefix, "HEAD")
242}
243
244/// Bundle-candidate filter: positive predicate — the final path segment
245/// must be `<sha>.bundle` (40 lower-hex chars + `.bundle`). All other
246/// per-ref siblings (`repo.zip`, `LOCK#.lock`, `PROTECTED#`, any
247/// hypothetical future artefact) are rejected by construction.
248///
249/// Earlier revisions filtered by full-key substring (`.zip`, `/LOCKS/`,
250/// `.lock`), which false-positively dropped real bundle keys whose ref
251/// name happened to contain those substrings — e.g.
252/// `refs/heads/v1.zip-rc1/<sha>.bundle` or
253/// `refs/heads/LOCKS-feature/x/<sha>.bundle`. A positive check on the
254/// final segment cannot misfire on ref-name content.
255fn is_bundle_candidate(key: &str) -> bool {
256 parse_remote_sha_from_key(key).is_some()
257}
258
259/// Returns every bundle object currently stored under `remote_ref`,
260/// filtered by [`is_bundle_candidate`] and excluding bundles named by
261/// any `<prefix>/gc/baseline-tomb-*.json` (issue #157). The store's
262/// listing prefix is `<prefix>/<ref>/` so sibling-ref keys don't leak
263/// in.
264///
265/// Tombstoned bundles are filtered here because the bundle engine
266/// uses the bundle-key listing as its source of truth for "ref →
267/// current SHA". A force-push that deferred the prior bundle leaves
268/// two `<sha>.bundle` keys under the ref; without the tombstone
269/// filter the next push's under-lock multi-bundle guard would refuse
270/// it. The tombstoned bundle stays readable at its original key for
271/// in-flight fetchers — only the listing path hides it.
272///
273/// Issue #165: `cached_hidden` lets the caller supply a tombstone set
274/// computed earlier in the same push (e.g. by [`prepare_push`]) and
275/// reused under the per-ref lock by [`perform_push_under_lock`]. The
276/// cached set is sound because all tombstone writers for a given ref
277/// (push's `defer_prior_bundle_via_tombstone`, compact's
278/// `tombstone_prior_baseline_bundle`, delete-branch's orphan path)
279/// run under the same per-ref lock — so no new tombstone for this
280/// ref can land between the pre-lock and under-lock calls within one
281/// `push_one` invocation. `None` keeps the original "fetch on demand"
282/// shape used by call sites that don't have a cache to share.
283async fn bundles_for_ref(
284 store: &dyn ObjectStore,
285 prefix: Option<&str>,
286 remote_ref: &RefName,
287 cached_hidden: Option<&HashSet<String>>,
288) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
289 let listing = ref_listing_prefix(prefix, remote_ref);
290 let metas = store.list(&listing).await?;
291 let bundles: Vec<ObjectMeta> = metas
292 .into_iter()
293 .filter(|m| is_bundle_candidate(&m.key))
294 .collect();
295 // Short-circuit the tombstone lookup when there is nothing that
296 // could be filtered. The common case (no force-push in the last
297 // grace window) pays no extra listing cost.
298 if bundles.is_empty() {
299 return Ok(bundles);
300 }
301 // Reuse the caller-supplied set when present; otherwise fetch
302 // on demand. The owned binding outlives the borrow so a single
303 // `&HashSet` covers both paths without an `Option`/`expect` dance.
304 let fetched;
305 let hidden: &HashSet<String> = if let Some(h) = cached_hidden {
306 h
307 } else {
308 fetched = tombstoned_bundle_keys(store, prefix).await?;
309 &fetched
310 };
311 Ok(bundles
312 .into_iter()
313 .filter(|m| !hidden.contains(&m.key))
314 .collect())
315}
316
317/// Returns `true` iff a `<prefix>/<ref>/PROTECTED#` marker exists.
318///
319/// Uses an exact-key `head` rather than a prefix `list`. `ObjectStore::list`
320/// is a byte-prefix match, so a `list("…/PROTECTED#")` would also return any
321/// future `PROTECTED#`-prefixed sibling key (e.g. `PROTECTED#audit`). The
322/// equality check here matches the semantics of the canonical
323/// `is_protected_marker_segment` snapshot-side helper and avoids
324/// spuriously blocking pushes or deletes on unprotected refs.
325pub(crate) async fn is_protected(
326 store: &dyn ObjectStore,
327 prefix: Option<&str>,
328 remote_ref: &RefName,
329) -> Result<bool, ObjectStoreError> {
330 let key = protected_marker_key(prefix, remote_ref);
331 match store.head(&key).await {
332 Ok(_) => Ok(true),
333 Err(ObjectStoreError::NotFound(_)) => Ok(false),
334 Err(e) => Err(e),
335 }
336}
337
338/// Build the `<prefix>/<ref>/PROTECTED#` key. Pulled out so the
339/// pre-delete protection probe ([`is_protected`]) and the post-sweep
340/// integrity verification ([`verify_no_orphan_protected_after_delete`])
341/// share one source of truth for the marker key shape.
342pub(crate) fn protected_marker_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
343 format!(
344 "{}{}",
345 ref_listing_prefix(prefix, remote_ref),
346 keys::PROTECTED_MARKER_SEGMENT,
347 )
348}
349
350/// Issue #151 defence-in-depth: after a delete-path sweep completes,
351/// confirm no `PROTECTED#` marker exists for `remote_ref`. The primary
352/// defence is the per-ref lock (#158, #159): every delete path acquires
353/// `<prefix>/<ref>/LOCK#.lock` before its under-lock listing and the
354/// sweep, and both `protect` and `unprotect` acquire the same key —
355/// so a marker landing between the under-lock listing and the sweep is
356/// mechanically impossible.
357///
358/// This helper is the belt to the lock's suspenders: if the marker is
359/// observed here, the contract was violated (a lock bypass, bucket-level
360/// inconsistency, or a misbehaving sibling tool), and we surface it as
361/// a structured `tracing::error!` so operators can investigate. The
362/// delete is NOT rolled back — the operator-visible "ref is gone"
363/// outcome stands; the orphan marker, if real, will block any future
364/// recreation of the same ref until `unprotect` removes it.
365///
366/// Uses one `head` (not a `list`) — cheap, exact-key, identical shape
367/// to [`is_protected`]. The caller passes the same `prefix` / `remote_ref`
368/// it used to compute the listing prefix so the marker key matches the
369/// sweep's namespace byte-for-byte.
370pub(crate) async fn verify_no_orphan_protected_after_delete(
371 store: &dyn ObjectStore,
372 prefix: Option<&str>,
373 remote_ref: &RefName,
374) {
375 let key = protected_marker_key(prefix, remote_ref);
376 match store.head(&key).await {
377 Ok(_) => {
378 tracing::error!(
379 key = %key,
380 remote_ref = %remote_ref.as_str(),
381 "delete path observed a PROTECTED# marker after sweep; the per-ref lock contract (#158, #159) should make this impossible — investigate for lock bypass or bucket-level inconsistency",
382 );
383 }
384 Err(ObjectStoreError::NotFound(_)) => {}
385 Err(e) => {
386 // The probe itself failed; this is not the integrity
387 // violation we are guarding against. Log at `debug!` so
388 // operators have the trail if a future incident needs it,
389 // but do not promote a transient `head` failure to an
390 // error — the delete already succeeded under the lock.
391 tracing::debug!(
392 key = %key,
393 error = %e,
394 "post-sweep PROTECTED# probe failed; cannot verify orphan-marker invariant for this delete",
395 );
396 }
397 }
398}
399
400/// Extract the SHA from a `<…>/<sha>.bundle` key. Returns `None` if the
401/// trailing segment does not match `[0-9a-f]{40}\.bundle`.
402///
403/// Charset/length validation goes through [`keys::is_valid_bundle_stem`]
404/// so the doctor's malformed-key detector and this parser agree on
405/// what counts as a well-formed stem. The defensive parse-error arm in
406/// [`prepare_push`] still calls this — although push's pre-lock listing
407/// filters malformed stems via [`is_bundle_candidate`] before reaching
408/// the arm, the guard remains in place so a future caller that bypasses
409/// the filter cannot silently treat a malformed key as a bundle.
410fn parse_remote_sha_from_key(key: &str) -> Option<Sha> {
411 let stem = parse_remote_sha_stem_from_key(key)?;
412 Sha::from_hex(stem).ok()
413}
414
415/// Extract the validated 40-lowercase-hex stem from a `<…>/<sha>.bundle`
416/// key without the `&str → Sha → String` round-trip
417/// [`parse_remote_sha_from_key`] incurs. Callers that need the schema
418/// [`Sha40`] (e.g. the tombstone writers) save one hex-encode + one
419/// re-validation by going through the borrowed stem directly.
420fn parse_remote_sha_stem_from_key(key: &str) -> Option<&str> {
421 let last = key.rsplit('/').next()?;
422 let stem = last.strip_suffix(".bundle")?;
423 if !keys::is_valid_bundle_stem(stem) {
424 return None;
425 }
426 Some(stem)
427}
428
429/// Read the lock TTL from `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS`,
430/// falling back to [`DEFAULT_LOCK_TTL_SECONDS`] if the env var is unset,
431/// unparseable, or zero. A zero TTL would make `acquire_lock` treat
432/// every held lock as instantly stale and defeat per-ref locking, so
433/// we mirror [`crate::packchain::gc::grace_hours_from_env`] and clamp
434/// it to the default.
435pub(crate) fn lock_ttl_from_env() -> Duration {
436 // Test-only: the `cfg`-gated block compiles out of release
437 // binaries (the `test-util` feature is dev-only and `cfg(test)`
438 // is set only when the crate itself is under test), so production
439 // callers pay nothing. In test builds, the read lock serialises
440 // against env-mutating tests that hold an `EnvGuard` for this
441 // key; `env_var_read_lock` returns `None` when the current thread
442 // itself holds the writer, so the env-mutating test does not
443 // deadlock on its own write lock.
444 #[cfg(any(test, feature = "test-util"))]
445 let _read = crate::test_util::env_var_read_lock(ENV_LOCK_TTL_SECONDS);
446 let secs = env::var(ENV_LOCK_TTL_SECONDS)
447 .ok()
448 .and_then(|s| s.parse::<u64>().ok())
449 .filter(|s| *s > 0)
450 .unwrap_or(DEFAULT_LOCK_TTL_SECONDS);
451 saturating_duration_seconds(secs)
452}
453
454/// Convert a `u64` seconds count to [`Duration`], saturating at
455/// [`i64::MAX`] (~292-billion-year sentinel TTL ceiling). Centralises
456/// the `i64::try_from(secs).unwrap_or(i64::MAX)` idiom that previously
457/// lived inline in both [`lock_ttl_from_env`] and
458/// [`crate::manage::compact::Compact::run_into`] (#221).
459#[must_use]
460pub(crate) fn saturating_duration_seconds(secs: u64) -> Duration {
461 Duration::seconds(i64::try_from(secs).unwrap_or(i64::MAX))
462}
463
464/// Same as [`lock_ttl_from_env`] but returns the value as `u64` seconds
465/// for callers that want the raw count rather than a `Duration`. The
466/// `expect` surfaces a future regression that loosens `lock_ttl_from_env`'s
467/// non-negative invariant instead of silently masking it.
468pub(crate) fn lock_ttl_from_env_seconds() -> u64 {
469 u64::try_from(lock_ttl_from_env().whole_seconds())
470 .expect("lock_ttl_from_env returns a non-negative seconds count")
471}
472
473/// Resolve a caller-supplied `Option<u64>` lock TTL to a concrete
474/// seconds value, applying the same zero-clamp as [`lock_ttl_from_env`].
475///
476/// Without this helper, `Compact::run_into` would accept a raw
477/// `Option<u64>` and feed `Some(0)` straight into the engine,
478/// bypassing the #112 clamp that protects against `acquire_lock`
479/// treating every held lock as instantly stale. A zero TTL defeats
480/// per-ref locking and corrupts concurrent pushes (issue #208).
481/// `Doctor::resolved_lock_ttl_seconds` deliberately does NOT route
482/// through this helper — doctor only compares lock ages and never
483/// acquires a lock, so an operator-explicit `--lock-ttl-seconds 0`
484/// is honoured as a "treat every lock as stale" request.
485///
486/// Resolution rules:
487///
488/// * `None` — defer to [`lock_ttl_from_env_seconds`] (env override or default).
489/// * `Some(0)` — defer to [`lock_ttl_from_env_seconds`] as well, so
490/// that an accidental zero from a CLI flag or default-constructed
491/// opts still picks up the operator's env override. This matches
492/// the shape of the existing env-side clamp.
493/// * `Some(n)` for `n > 0` — return `n` unchanged.
494///
495/// No upper bound is enforced: the existing env path accepts any
496/// `u64` and downstream `time::Duration::seconds` saturates at
497/// `i64::MAX` (~292 billion years), which is fine for a TTL ceiling.
498pub(crate) fn resolve_lock_ttl_seconds(opt: Option<u64>) -> u64 {
499 opt.filter(|&n| n > 0)
500 .unwrap_or_else(lock_ttl_from_env_seconds)
501}
502
503/// Handle to an acquired per-ref lock with a live heartbeat task.
504///
505/// Returned by [`acquire_lock`]. The guard owns a background tokio task
506/// that periodically re-PUTs the lock key (overwrite, not conditional)
507/// to refresh `last_modified` and keep stale-lock recovery from
508/// stealing a still-live lock from under a long-running critical
509/// section (issue #118).
510///
511/// ## Release
512///
513/// Call [`release_lock`] (or [`Self::release`]) to relinquish: this
514/// signals cooperative shutdown to the heartbeat, awaits its exit so
515/// any in-flight heartbeat PUT completes (or errors) on the server
516/// BEFORE the DELETE is issued, then deletes the lock key. Without
517/// the await-then-delete ordering, an in-flight heartbeat PUT could
518/// settle on the server after the DELETE and resurrect the lock key
519/// as an orphan (issue #150).
520///
521/// ## Drop
522///
523/// If a guard is dropped without an explicit release (panic, early
524/// return without the post-result match, etc.) the heartbeat is told
525/// to stop via the same shutdown signal and the join handle is
526/// aborted (we cannot `.await` in `Drop`). The lock key remains on
527/// the bucket and will be picked up by the next acquire attempt's
528/// stale-recovery path after `ttl` elapses (bounded by
529/// `ttl + heartbeat_interval` since the last heartbeat). Callers
530/// should still prefer explicit release so the lock is freed
531/// immediately.
532#[must_use = "lock guards must be released; dropping leaks the lock until TTL"]
533pub(crate) struct LockGuard {
534 /// Bucket key of the lock. `pub(crate)` so call sites can format
535 /// it into error messages (the old API surfaced `&str`).
536 lock_key: String,
537 /// Store handle, kept around so [`Self::release`] can delete the
538 /// key without callers re-passing it. Also given to the heartbeat
539 /// task via clone.
540 store: Arc<dyn ObjectStore>,
541 /// Cooperative shutdown signal. The heartbeat task watches this
542 /// receiver via [`tokio::select!`]: a `send(true)` wakes the loop
543 /// at any await point inside the `select!` (between ticks) AND is
544 /// checked synchronously before issuing each `put_bytes`, so no
545 /// PUT can be dispatched after [`Self::release`] has begun.
546 shutdown: watch::Sender<bool>,
547 /// Heartbeat task handle. `Some` while the guard owns a live
548 /// heartbeat; taken by release (awaited) or drop (aborted as a
549 /// fallback since `Drop` cannot await).
550 heartbeat: Option<JoinHandle<()>>,
551}
552
553/// Upper bound on how long [`LockGuard::release`] waits for the
554/// heartbeat task to exit cooperatively before falling back to abort.
555///
556/// The heartbeat task wakes from its `select!` immediately on
557/// shutdown and only issues a PUT if it had already passed the
558/// shutdown re-check, so the worst-case wait is one in-flight
559/// `put_bytes` RTT. Five seconds is well above any healthy RTT but
560/// well below any reasonable user-visible push timeout — if the
561/// network is wedged hard enough to outlast it, the fallback abort
562/// keeps `release` from hanging and the orphan key just regresses to
563/// the pre-fix behaviour (stale-lock recovery after TTL).
564const HEARTBEAT_JOIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
565
566impl LockGuard {
567 /// Release the lock: cooperatively stop the heartbeat (awaiting
568 /// the task's exit so any in-flight PUT settles first), then
569 /// delete the lock key. `NotFound` is mapped to `Ok(())` (the
570 /// heartbeat may have raced a stale-recovery delete, or an
571 /// operator may have cleared the lock manually); every other
572 /// delete failure is propagated.
573 pub(crate) async fn release(mut self) -> Result<(), ObjectStoreError> {
574 // ORDER IS LOAD-BEARING: stop the heartbeat AND wait for its
575 // join handle BEFORE deleting the lock key. The previous
576 // implementation called `handle.abort()` and then DELETE
577 // synchronously, but `abort()` only cancels the future at its
578 // next await point — a `put_bytes` already in flight on the
579 // server completes regardless of cancellation and can settle
580 // AFTER our DELETE, resurrecting the lock key as an orphan
581 // (issue #150). Cooperative shutdown via `watch` + join-await
582 // makes the worst case "one in-flight PUT completes before
583 // DELETE" instead of "one in-flight PUT completes after
584 // DELETE".
585 self.stop_heartbeat().await;
586 delete_idempotent(self.store.as_ref(), &self.lock_key).await
587 }
588
589 /// Signal cooperative shutdown to the heartbeat task and await
590 /// its exit (bounded by [`HEARTBEAT_JOIN_TIMEOUT`]). On timeout
591 /// the handle is `abort()`-ed, restoring the pre-#150 behaviour
592 /// for the pathological case of a `put_bytes` hung past the
593 /// timeout.
594 async fn stop_heartbeat(&mut self) {
595 // `send` only fails when there are no receivers — i.e. the
596 // heartbeat task has already exited. Either way, there's no
597 // PUT we can still prevent, so we ignore the result.
598 let _ = self.shutdown.send(true);
599 let Some(mut handle) = self.heartbeat.take() else {
600 return;
601 };
602 // Borrow the handle into `timeout` so we can still call
603 // `abort()` on it after the wait fails.
604 if tokio::time::timeout(HEARTBEAT_JOIN_TIMEOUT, &mut handle)
605 .await
606 .is_err()
607 {
608 warn!(
609 key = %self.lock_key,
610 timeout_secs = HEARTBEAT_JOIN_TIMEOUT.as_secs(),
611 "lock heartbeat did not exit within join timeout; \
612 falling back to abort (in-flight PUT may still race \
613 the upcoming DELETE)",
614 );
615 handle.abort();
616 }
617 }
618}
619
620impl Drop for LockGuard {
621 fn drop(&mut self) {
622 // `Drop` cannot `.await`, so we cannot mirror the release
623 // path's "wait for in-flight PUT to settle" guarantee here.
624 // The best we can do is (a) signal shutdown so the heartbeat
625 // task exits at its next await point, and (b) abort the
626 // JoinHandle so a future PUT issued while we were dropping
627 // doesn't keep racing forever. The lock key may briefly
628 // outlive the holder; the stale-lock recovery path
629 // (`acquire_lock`) reclaims it after TTL.
630 let _ = self.shutdown.send(true);
631 if let Some(handle) = self.heartbeat.take() {
632 handle.abort();
633 }
634 }
635}
636
637/// Heartbeat interval: `ttl/3`, floored at one second so a pathological
638/// sub-second TTL doesn't busy-loop. We use `ttl/3` rather than `ttl/2`
639/// so a single missed heartbeat (transient network blip with
640/// `MissedTickBehavior::Delay`) still leaves margin before `age > ttl`:
641/// two consecutive misses are needed to push the lock past the
642/// staleness threshold (#118 follow-up).
643pub(crate) fn heartbeat_interval(ttl: Duration) -> std::time::Duration {
644 let secs = ttl.whole_seconds().max(3) / 3;
645 // `try_from` rather than `as`: we just clamped to ≥ 1, but
646 // `clippy::cast_sign_loss` insists on the explicit fallible cast.
647 let secs_u64 =
648 u64::try_from(secs).expect("ttl.whole_seconds().max(3) / 3 is always >= 1 (non-negative)");
649 std::time::Duration::from_secs(secs_u64)
650}
651
652/// Try to acquire the per-ref lock. Returns `Ok(Some(guard))` when the
653/// lock was taken (heartbeat is already running) and `Ok(None)` on
654/// contention (caller should surface a "lock held" error). On a stale
655/// lock (older than `ttl`, with no heartbeat from a live holder), the
656/// lock is deleted and the conditional `put_if_absent` is retried
657/// once.
658///
659/// The race window between `head` and the retry `put_if_absent` is
660/// inherent to non-conditional deletes — another client could acquire
661/// the lock between our delete and retry. We accept that race; the
662/// retry `put_if_absent` will return `Ok(false)` and the user will
663/// retry.
664///
665/// Heartbeat semantics (issue #118): a live holder refreshes the lock
666/// every `ttl/3` so the staleness check correctly excludes locks held
667/// by an in-flight critical section. A long-running [`compact`] no
668/// longer races a concurrent writer that wakes up after the original
669/// TTL elapses.
670///
671/// [`compact`]: crate::packchain::compact::compact
672pub(crate) async fn acquire_lock(
673 store: Arc<dyn ObjectStore>,
674 lock_key: &str,
675 ttl: Duration,
676 now: OffsetDateTime,
677) -> Result<Option<LockGuard>, ObjectStoreError> {
678 if store.put_if_absent(lock_key, Bytes::new()).await? {
679 return Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)));
680 }
681 let meta = match store.head(lock_key).await {
682 Ok(m) => m,
683 // Lock vanished between put_if_absent and head — another client
684 // released it. Treat as contention; user retries.
685 Err(ObjectStoreError::NotFound(_)) => return Ok(None),
686 Err(e) => return Err(e),
687 };
688 let age = now - meta.last_modified;
689 if age <= ttl {
690 return Ok(None);
691 }
692 debug!(key = %lock_key, age_secs = age.whole_seconds(), "deleting stale lock");
693 delete_idempotent(store.as_ref(), lock_key).await?;
694 if store.put_if_absent(lock_key, Bytes::new()).await? {
695 Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)))
696 } else {
697 Ok(None)
698 }
699}
700
701/// Build a [`LockGuard`] and spawn its heartbeat task. The heartbeat
702/// re-PUTs the lock key every `heartbeat_interval(ttl)` (overwrite, not
703/// conditional) so the stale-lock recovery branch in [`acquire_lock`]
704/// correctly excludes still-held locks. Heartbeat failures are logged
705/// at `warn` and retried on the next tick — a transient blip should
706/// not surface as a critical-section-aborting error.
707///
708/// The task watches a [`watch::Receiver<bool>`]: `release` flips it to
709/// `true` and the loop exits at the next `select!` await point.
710/// Critically, the loop also re-checks the flag synchronously after
711/// waking from the tick and BEFORE issuing `put_bytes`, so once
712/// `release` has signalled shutdown no further PUT can be dispatched
713/// — closing the issue #150 race where an `abort()`-cancelled task
714/// had an in-flight `put_bytes` that settled on the server after
715/// `release`'s DELETE.
716fn spawn_lock_guard(store: Arc<dyn ObjectStore>, lock_key: String, ttl: Duration) -> LockGuard {
717 let interval = heartbeat_interval(ttl);
718 let task_store = Arc::clone(&store);
719 let task_key = lock_key.clone();
720 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
721 let handle = tokio::spawn(async move {
722 let mut tick = tokio::time::interval(interval);
723 // Skip the immediate first tick — the acquire just wrote the
724 // key, so a refresh in the same millisecond would be wasted
725 // bandwidth.
726 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
727 tick.tick().await; // immediate
728 loop {
729 tokio::select! {
730 // `biased` so shutdown is checked first on every
731 // wake: a tick and a shutdown that fire on the same
732 // poll must resolve to shutdown, otherwise the tick
733 // arm could win and issue one final PUT after release
734 // has begun.
735 biased;
736 _ = shutdown_rx.changed() => break,
737 _ = tick.tick() => {}
738 }
739 // Re-check the shutdown flag synchronously between the
740 // tick and the `put_bytes` await. Without this, a
741 // `release` that signals shutdown AFTER `tick.tick()`
742 // resolved but BEFORE we issued `put_bytes` would still
743 // let the PUT through and race the DELETE — exactly the
744 // issue #150 failure mode the `select!` alone does not
745 // close.
746 if *shutdown_rx.borrow() {
747 break;
748 }
749 match task_store
750 .put_bytes(&task_key, Bytes::new(), PutOpts::default())
751 .await
752 {
753 Ok(()) => debug!(key = %task_key, "lock heartbeat refreshed"),
754 Err(e) => warn!(
755 key = %task_key,
756 error = %e,
757 "lock heartbeat refresh failed; will retry",
758 ),
759 }
760 }
761 });
762 LockGuard {
763 lock_key,
764 store,
765 shutdown: shutdown_tx,
766 heartbeat: Some(handle),
767 }
768}
769
770/// Release a previously acquired per-ref lock. `NotFound` is mapped to
771/// `Ok(())` (another client or the TTL may have already cleaned it up);
772/// every other delete failure is propagated so the caller can surface it.
773///
774/// Heartbeat semantics: the guard's heartbeat task is aborted before
775/// the delete so a heartbeat refresh in flight cannot re-create the
776/// key after we've removed it.
777pub(crate) async fn release_lock(guard: LockGuard) -> Result<(), ObjectStoreError> {
778 guard.release().await
779}
780
781/// Idempotent delete: treats `NotFound` as success (another client may
782/// have raced ahead) but propagates every other error.
783pub(crate) async fn delete_idempotent(
784 store: &dyn ObjectStore,
785 key: &str,
786) -> Result<(), ObjectStoreError> {
787 match store.delete(key).await {
788 Ok(()) | Err(ObjectStoreError::NotFound(_)) => Ok(()),
789 Err(e) => Err(e),
790 }
791}
792
793/// Best-effort delete of the prior bundle once the new bundle is durable.
794///
795/// Issue #121: `perform_push_under_lock` writes the new bundle first
796/// (the durable commit) and then deletes the previous bundle. A
797/// non-`NotFound` error on the delete must NOT fail the push — the new
798/// bundle is already on the bucket, so reporting failure to the user
799/// is a lie about the remote state. The two-bundle state that results
800/// from the orphan trips the under-lock "multiple bundles" guard on
801/// the next push, which directs the operator to `doctor`; the warn
802/// log gives them the orphan key directly.
803///
804/// Mirrors `force_push_baseline_cleanup` in `packchain::push`
805/// (issue #113).
806///
807/// Issue #157: this is now the fallback path used only when the
808/// preferred deferred-via-tombstone path
809/// ([`defer_prior_bundle_via_tombstone`]) cannot be taken — namely a
810/// prior key whose stem does not parse as a [`Sha40`]. The on-bucket
811/// layout filter ([`is_bundle_candidate`]) already rejects malformed
812/// stems before they reach the push path, so this branch is reachable
813/// only via a future schema loosening or manual bucket tampering.
814async fn delete_prior_bundle_best_effort(
815 store: &dyn ObjectStore,
816 remote_ref: &RefName,
817 prior_key: &str,
818) {
819 if let Err(e) = delete_idempotent(store, prior_key).await {
820 warn!(
821 ref_path = %remote_ref.as_str(),
822 key = %prior_key,
823 error = %e,
824 "prior-bundle cleanup failed (new bundle already committed); \
825 orphan key left for manual cleanup",
826 );
827 }
828}
829
830/// Defer the prior-bundle delete to `gc sweep` by writing a baseline
831/// tombstone naming the old SHA.
832///
833/// Issue #157: synchronously deleting the prior bundle from the push
834/// path created a race against in-flight fetches. A client that ran
835/// `list` and saw `refs/heads/main -> old_sha` then issued
836/// `fetch old_sha refs/heads/main` against the EXACT
837/// `<prefix>/<ref>/<old_sha>.bundle` path — [`fetch_one`] does not
838/// re-list or consult HEAD. If a concurrent force-push synchronously
839/// deleted that key between the `list` advertisement and the fetcher's
840/// GET, fetch failed with `NotFound`.
841///
842/// The fix mirrors the packchain force-push tombstone path (issue
843/// #134, `force_push_baseline_cleanup` in `packchain::push`): write a
844/// `<prefix>/gc/baseline-tomb-*.json` record naming the prior SHA, and
845/// let `gc sweep` reclaim the bundle after the configured grace
846/// window. Until grace expires the prior bundle remains readable, so
847/// any fetcher that advertised it from a stale list can still
848/// complete.
849///
850/// The tombstone format and key namespace are shared with the
851/// packchain engine because `gc sweep` is engine-agnostic at the
852/// reclamation layer: [`sweep_one_baseline_tombstone`] derives the
853/// bundle key from `(ref_name, sha)` and runs the same live-state
854/// recheck regardless of which engine wrote the tombstone.
855///
856/// **Fallback semantics on tombstone write failure**: `gc sweep` only
857/// reclaims keys it has a tombstone for, so a tombstone PUT failure
858/// would otherwise orphan the prior bundle indefinitely. To preserve
859/// the issue #121 "two-bundle state surfaces via the doctor message"
860/// recovery path, this helper falls back to the synchronous
861/// best-effort delete from [`delete_prior_bundle_best_effort`] on
862/// either:
863///
864/// - a prior key whose stem does not parse as a [`Sha40`] (no way to
865/// name it in the tombstone body), or
866/// - a [`write_baseline_tombstone_best_effort`] PUT failure (the
867/// helper already logged a warn naming the orphan).
868async fn defer_prior_bundle_via_tombstone(
869 store: &dyn ObjectStore,
870 prefix: Option<&str>,
871 remote_ref: &RefName,
872 prior_key: &str,
873 local_sha: Sha,
874) {
875 let Some(prior_stem) = parse_remote_sha_stem_from_key(prior_key) else {
876 warn!(
877 ref_path = %remote_ref.as_str(),
878 key = %prior_key,
879 "prior bundle key does not parse as a valid SHA; falling back to synchronous delete",
880 );
881 delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
882 return;
883 };
884 // `parse_remote_sha_stem_from_key` already validated the stem
885 // against `is_valid_bundle_stem` (40 lowercase hex), so
886 // `Sha40::try_new(prior_stem)` is infallible in practice; surface
887 // a failure as the synchronous-delete fallback rather than
888 // panicking — a future tightening of `Sha40` validation must not
889 // crash the push. `local_sha` still routes through `to_string`
890 // because `Sha` (gix `ObjectId`) only exposes a binary form.
891 let (Ok(prior_sha40), Ok(local_sha40)) = (
892 Sha40::try_new(prior_stem),
893 Sha40::try_new(local_sha.to_string()),
894 ) else {
895 warn!(
896 ref_path = %remote_ref.as_str(),
897 key = %prior_key,
898 "prior or local sha failed Sha40 validation; falling back to synchronous delete",
899 );
900 delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
901 return;
902 };
903 let wrote = write_baseline_tombstone_best_effort(
904 store,
905 prefix,
906 remote_ref,
907 &prior_sha40,
908 &local_sha40,
909 "bundle-engine-force-push",
910 )
911 .await;
912 if wrote {
913 debug!(
914 ref_path = %remote_ref.as_str(),
915 key = %prior_key,
916 "prior bundle deferred to gc sweep via baseline tombstone",
917 );
918 } else {
919 // `write_baseline_tombstone_best_effort` already logged a warn
920 // naming the orphan key. Fall back to the synchronous delete
921 // so the next push's multi-bundle guard does not flag this
922 // case to the operator unnecessarily.
923 delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
924 }
925}
926
927/// Best-effort upload of the optional zip artifact once the new bundle
928/// is durable.
929///
930/// Issue #127: `perform_push_under_lock` writes the new bundle, `HEAD`,
931/// and `FORMAT` (the git-protocol contract for a successful push) and
932/// then uploads an optional `repo.zip` CodePipeline-side convenience
933/// artifact. A non-`NotFound` error on the zip upload must NOT fail
934/// the push — the bundle is already durable, so reporting failure is
935/// a lie about the remote state. The next push re-puts the zip at
936/// the same key (idempotent), so an operator who notices the warn log
937/// or wants the artifact re-uploaded simply re-pushes.
938///
939/// Mirrors [`delete_prior_bundle_best_effort`] (issue #121) and
940/// `force_push_baseline_cleanup` in `packchain::push` (issue #113):
941/// log at warn with `ref_path`, `key`, and `error`, and return
942/// without propagating.
943async fn upload_zip_artifact_best_effort(
944 store: &dyn ObjectStore,
945 remote_ref: &RefName,
946 zip_dest: &str,
947 archive_path: &Path,
948 opts: PutOpts,
949) {
950 if let Err(e) = store.put_path(zip_dest, archive_path, opts).await {
951 warn!(
952 ref_path = %remote_ref.as_str(),
953 key = %zip_dest,
954 error = %e,
955 "zip artifact upload failed (bundle already committed); \
956 retry the push to re-upload the zip at the same key",
957 );
958 }
959}
960
961/// Drive a batch of `push` commands sequentially.
962///
963/// Each command is parsed, executed under its own per-ref lock, and
964/// produces one [`PushOutcome`]. Catastrophic errors (transport,
965/// malformed protocol input) abort the batch and bubble out as
966/// [`PushError`]; per-ref failures are encoded as
967/// [`PushOutcome::Error`] and the batch continues.
968pub(crate) async fn push_batch(
969 ctx: &super::BatchCtx,
970 kind: BackendKind,
971 zip: bool,
972 engine: StorageEngine,
973 cmds: Vec<String>,
974) -> Result<Vec<PushOutcome>, PushError> {
975 if cmds.is_empty() {
976 return Ok(Vec::new());
977 }
978 debug!(count = cmds.len(), "processing push batch");
979
980 let config = PushConfig {
981 zip,
982 engine,
983 ttl: lock_ttl_from_env(),
984 kind,
985 };
986 let mut outcomes = Vec::with_capacity(cmds.len());
987
988 for cmd in cmds {
989 // `parse_push_args` failures are catastrophic: a malformed `push`
990 // line means we cannot trust subsequent commands. Abort the batch.
991 let spec = parse_push_args(&cmd)?;
992 // Capture the ref name before `push_one` consumes the spec so we
993 // can still render an `error <ref> ...` line if the call fails.
994 let remote_ref_str = spec.remote_ref.as_str().to_owned();
995 let outcome = match push_one(
996 Arc::clone(&ctx.store),
997 ctx.prefix.as_deref(),
998 ctx.repo_dir.as_path(),
999 &config,
1000 OffsetDateTime::now_utc(),
1001 spec,
1002 )
1003 .await
1004 {
1005 Ok(o) => o,
1006 // Per-push operational failures (transport, local git, local I/O,
1007 // malformed remote bundle SHA) become `error <ref>` lines so the
1008 // batch can continue. Without this, a single 5xx blip in the
1009 // middle of a multi-ref push would silently drop the outcome
1010 // lines for already-completed pushes and leave git's local
1011 // ref-tracking inconsistent with the remote.
1012 Err(e)
1013 if matches!(
1014 e,
1015 PushError::Store(_) | PushError::Git(_) | PushError::Io(_) | PushError::Sha(_)
1016 ) =>
1017 {
1018 let chain = full_error_chain(&e);
1019 warn!(ref = %remote_ref_str, error = %chain, "push ref failed");
1020 PushOutcome::Error {
1021 remote_ref: remote_ref_str,
1022 message: format!(r#""{chain}"?"#),
1023 }
1024 }
1025 Err(e) => return Err(e),
1026 };
1027 outcomes.push(outcome);
1028 }
1029 Ok(outcomes)
1030}
1031
1032/// Render a [`PushError`] as a colon-separated chain so the
1033/// `error <ref>` wire line and the stderr log carry every level of
1034/// causal context.
1035///
1036/// `PushError`'s `thiserror` formats inline the immediate source via
1037/// `{0}` / `{source}`, and some of those sources (notably
1038/// [`ObjectStoreError::Network`], whose own Display embeds its boxed
1039/// inner error) themselves inline a further level — so a naive
1040/// chain-walk would produce `"object-store error during push: network
1041/// error: dns failure: dns failure"`. [`super::append_source_chain`]
1042/// dedups any level whose text is already at the tail of `msg`, so
1043/// the rendered chain is uniform across `Store`, `Git`, `Io`, and `Sha`
1044/// variants without per-variant special-casing.
1045fn full_error_chain(err: &PushError) -> String {
1046 let mut msg = err.to_string();
1047 super::append_source_chain(&mut msg, err);
1048 msg
1049}
1050
1051/// Recoverable per-push errors discovered while talking to the local
1052/// repo. Mapped by the caller into [`PushOutcome::Error`] strings.
1053enum GitProbeError {
1054 /// `local_spec` did not resolve in the local repo.
1055 LocalRefNotFound,
1056 /// Pre-existing remote bundle is not an ancestor of `local_sha`.
1057 NotAncestor,
1058}
1059
1060/// Local git work that must run synchronously because `gix::Repository`
1061/// is `!Sync` and cannot cross `.await` points without making the
1062/// surrounding future `!Send`.
1063struct LocalGit {
1064 /// Resolved commit OID for the user's `local_spec`.
1065 local_sha: Sha,
1066 /// Working directory passed to `bundle::create`.
1067 cwd: PathBuf,
1068 /// On the zip path: archive on disk + metadata for the upload.
1069 /// `None` on the regular push path. The `TempDir` keeps the file
1070 /// alive until the async caller reads its bytes.
1071 zip_artifacts: Option<ZipArtifacts>,
1072 /// Whether the pre-lock-listed remote bundle's SHA was an ancestor
1073 /// of `local_sha`. `true` when there was no pre-existing bundle.
1074 /// Stashed so [`perform_push_under_lock`] can decide, under the
1075 /// per-ref lock, whether a force-push against a now-`PROTECTED#`
1076 /// ref is a safe fast-forward (issue #129).
1077 pre_existing_was_ancestor: bool,
1078}
1079
1080struct ZipArtifacts {
1081 archive_path: PathBuf,
1082 short_sha: String,
1083 commit_msg: String,
1084 /// Owned tempdir that backs `archive_path`; dropped after upload.
1085 _tempdir: tempfile::TempDir,
1086}
1087
1088/// All state computed before the per-ref lock is acquired.
1089///
1090/// Passed to [`perform_push_under_lock`] so the argument count stays within
1091/// the clippy budget and the two phases of [`push_one`] have a clean
1092/// boundary: pre-lock work (protect check, bundle listing, local git,
1093/// bundle creation) vs. under-lock work (re-list, upload, HEAD/FORMAT
1094/// init, old-bundle deletion).
1095struct PushReadyState {
1096 remote_ref: RefName,
1097 local_sha: Sha,
1098 /// Key of the pre-existing bundle (if any). Checked inside the lock
1099 /// to detect concurrent pushes (stale-remote guard).
1100 pre_existing: Option<String>,
1101 bundle_path: PathBuf,
1102 zip_artifacts: Option<ZipArtifacts>,
1103 engine: StorageEngine,
1104 /// User's `--force` intent. Carried into the lock window so the
1105 /// `PROTECTED#` check can run *under* the lock and close the TOCTOU
1106 /// window between the pre-lock check and the lock acquisition
1107 /// (issue #129).
1108 force: bool,
1109 /// Whether the pre-lock-listed bundle's SHA was an ancestor of
1110 /// `local_sha` (always `true` when there was no pre-existing
1111 /// bundle). Used together with `force` and the under-lock
1112 /// protection re-check to decide whether to reject a force-push
1113 /// that became protected mid-flight.
1114 pre_existing_was_ancestor: bool,
1115 /// Original local refspec, kept so the under-lock `NotAncestor` error
1116 /// message renders the same text the pre-lock arm would have used.
1117 local_spec: String,
1118 /// Issue #165: tombstone set captured during [`prepare_push`] so the
1119 /// under-lock [`bundles_for_ref`] call inside
1120 /// [`perform_push_under_lock`] does not re-list `<prefix>/gc/` and
1121 /// re-fetch every baseline tombstone. Sound because all tombstone
1122 /// writers for this ref serialize through the same per-ref lock —
1123 /// no new tombstone for this ref can land in the pre-lock /
1124 /// under-lock window.
1125 hidden_bundles: HashSet<String>,
1126 /// Keeps the temp directory (and `bundle_path`) alive until the bundle
1127 /// is uploaded inside `perform_push_under_lock`.
1128 _temp_dir: tempfile::TempDir,
1129}
1130
1131/// Outcome of [`prepare_push`]: one of three paths into [`push_one`]'s
1132/// lock window.
1133///
1134/// - [`PrepareOutcome::Ready`] — pre-lock work completed; caller must
1135/// acquire the per-ref lock and call [`perform_push_under_lock`].
1136/// - [`PrepareOutcome::Delete`] — delete refspec (`:<ref>`); caller must
1137/// acquire the per-ref lock and call [`delete_remote_ref_under_lock`].
1138/// Issue #133: the listing and sweep MUST run under the lock so a
1139/// concurrent push that lands a new bundle cannot turn the delete
1140/// into a silent false success.
1141/// - [`PrepareOutcome::Done`] — outcome already decided pre-lock
1142/// (multi-bundle corruption, ancestry failure, …); caller returns it
1143/// directly without taking the lock.
1144enum PrepareOutcome {
1145 // `PushReadyState` is the largest variant (paths, temp dir guard,
1146 // hidden-bundle hash set added in #165); boxing keeps the enum's
1147 // discriminant compact regardless of variant.
1148 Ready(Box<PushReadyState>),
1149 Delete { remote_ref: RefName, zip: bool },
1150 Done(PushOutcome),
1151}
1152
1153/// Open the repo, resolve `local_sha`, compute ancestry of the
1154/// pre-existing remote bundle, and (for the zip variant) build the
1155/// archive synchronously. The `Repository` handle is dropped before
1156/// this returns so the caller's `Future` can stay `Send`. Archive
1157/// bytes are NOT read here — the async caller does that with
1158/// `tokio::fs::read` to avoid blocking the runtime on file I/O.
1159///
1160/// Ancestry is always computed and returned via
1161/// [`LocalGit::pre_existing_was_ancestor`]; this function only emits
1162/// [`GitProbeError::NotAncestor`] for the non-force case. Force pushes
1163/// defer the ancestry-vs-protection decision to
1164/// [`perform_push_under_lock`] so a concurrent `protect` cannot race
1165/// the pre-lock check (issue #129).
1166fn local_git_work(
1167 repo_dir: &Path,
1168 local_spec: &str,
1169 pre_existing_sha: Option<Sha>,
1170 force_push: bool,
1171 zip: bool,
1172) -> Result<Result<LocalGit, GitProbeError>, GitError> {
1173 let repo = gix::open(repo_dir)?;
1174 let cwd = repo.workdir().unwrap_or_else(|| repo.git_dir()).to_owned();
1175
1176 let Ok(local_sha) = git::branch::resolve(&repo, local_spec) else {
1177 return Ok(Err(GitProbeError::LocalRefNotFound));
1178 };
1179
1180 let pre_existing_was_ancestor = match pre_existing_sha {
1181 None => true,
1182 Some(remote_sha) => git::is_ancestor(&repo, remote_sha, local_sha)?,
1183 };
1184 if !force_push && !pre_existing_was_ancestor {
1185 return Ok(Err(GitProbeError::NotAncestor));
1186 }
1187
1188 let zip_artifacts = if zip {
1189 let tempdir = tempfile::Builder::new()
1190 .prefix("git_remote_object_store_archive_")
1191 .tempdir()?;
1192 let archive_path = git::archive(&repo, tempdir.path(), local_spec)?;
1193 let commit_msg = git::last_commit_message(&repo).unwrap_or_default();
1194 let sha_hex = local_sha.to_string();
1195 let short_sha = sha_hex[..8].to_owned();
1196 Some(ZipArtifacts {
1197 archive_path,
1198 short_sha,
1199 commit_msg,
1200 _tempdir: tempdir,
1201 })
1202 } else {
1203 None
1204 };
1205
1206 drop(repo);
1207 Ok(Ok(LocalGit {
1208 local_sha,
1209 cwd,
1210 zip_artifacts,
1211 pre_existing_was_ancestor,
1212 }))
1213}
1214
1215/// Perform all pre-lock work for a push: resolve the local ref, check
1216/// ancestry, list the remote bundles, and create the local bundle file.
1217/// Returns [`PrepareOutcome::Done`] for cases that are already resolved
1218/// (delete-refspec, protection check, multiple-bundle error, ancestry
1219/// failure) or [`PrepareOutcome::Ready`] when the caller should proceed
1220/// to acquire the per-ref lock and call [`perform_push_under_lock`].
1221async fn prepare_push(
1222 store: &dyn ObjectStore,
1223 prefix: Option<&str>,
1224 repo_dir: &Path,
1225 config: &PushConfig,
1226 spec: PushSpec,
1227) -> Result<PrepareOutcome, PushError> {
1228 let PushSpec {
1229 force,
1230 local_spec,
1231 remote_ref,
1232 } = spec;
1233 let remote_ref_str = remote_ref.as_str().to_owned();
1234
1235 if local_spec.is_empty() {
1236 // Issue #133: do NOT list / sweep here. The bundle engine's
1237 // delete must run INSIDE the per-ref lock so a concurrent push
1238 // landing a new bundle between our listing and our deletion
1239 // cannot produce a silent false success. Defer to
1240 // [`delete_remote_ref_under_lock`] in [`push_one`].
1241 return Ok(PrepareOutcome::Delete {
1242 remote_ref,
1243 zip: config.zip,
1244 });
1245 }
1246
1247 // Issue #129: do NOT call `is_protected` here. A pre-lock check
1248 // races against a concurrent `protect`: the check could pass and a
1249 // `PROTECTED#` marker could land before we acquire the per-ref
1250 // lock, letting a force-push overwrite a now-protected ref. The
1251 // check is performed *under* the lock in
1252 // [`perform_push_under_lock`] instead. Pre-lock we just respect the
1253 // user's `--force` intent; the `local_git_work` probe still returns
1254 // ancestry info via [`LocalGit::pre_existing_was_ancestor`] so the
1255 // under-lock arm can render the same NotAncestor message a
1256 // protection-demoted non-force push would have produced.
1257 let force_push = force;
1258 debug!(local = %local_spec, remote = %remote_ref, force_push, "push");
1259
1260 // Issue #165: compute the tombstone set once here and reuse it under
1261 // the per-ref lock in [`perform_push_under_lock`]. The lock ensures
1262 // no concurrent writer can publish a tombstone for this ref between
1263 // the two `bundles_for_ref` calls. The cached set is unused on the
1264 // multi-bundle / delete / parse-error early-return paths — a wasted
1265 // listing — but each path already terminates the push, so the
1266 // ~one-call overhead is bounded.
1267 let hidden_bundles = tombstoned_bundle_keys(store, prefix).await?;
1268 let pre_bundles = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
1269 if pre_bundles.len() > 1 {
1270 return Ok(PrepareOutcome::Done(PushOutcome::Error {
1271 remote_ref: remote_ref_str,
1272 message:
1273 r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
1274 .to_owned(),
1275 }));
1276 }
1277 let pre_existing = pre_bundles.into_iter().next().map(|m| m.key);
1278
1279 let pre_existing_sha = match pre_existing.as_deref() {
1280 None => None,
1281 Some(key) => {
1282 let Some(s) = parse_remote_sha_from_key(key) else {
1283 return Ok(PrepareOutcome::Done(PushOutcome::Error {
1284 remote_ref: remote_ref_str,
1285 message: format!(
1286 r#""unable to parse remote bundle key {key:?}; run git-remote-object-store doctor to fix."?"#,
1287 ),
1288 }));
1289 };
1290 Some(s)
1291 }
1292 };
1293
1294 // Sync gix work (rev-parse / ancestor / archive) runs in a
1295 // dedicated scope so the !Sync `Repository` is dropped before any
1296 // .await — keeps `push_batch`'s future `Send`.
1297 let probe = local_git_work(
1298 repo_dir,
1299 &local_spec,
1300 pre_existing_sha,
1301 force_push,
1302 config.zip,
1303 )?;
1304 let local = match probe {
1305 Ok(local) => local,
1306 Err(GitProbeError::LocalRefNotFound) => {
1307 return Ok(PrepareOutcome::Done(PushOutcome::Error {
1308 remote_ref: remote_ref_str,
1309 message: format!(r#""{local_spec} not found"?"#),
1310 }));
1311 }
1312 Err(GitProbeError::NotAncestor) => {
1313 return Ok(PrepareOutcome::Done(PushOutcome::Error {
1314 remote_ref: remote_ref_str,
1315 message: not_ancestor_wire_message(&local_spec),
1316 }));
1317 }
1318 };
1319
1320 let temp_dir = tempfile::Builder::new()
1321 .prefix("git_remote_object_store_push_")
1322 .tempdir()?;
1323 let bundle_path =
1324 git::bundle_at(&local.cwd, temp_dir.path(), local.local_sha, &local_spec).await?;
1325
1326 Ok(PrepareOutcome::Ready(Box::new(PushReadyState {
1327 remote_ref,
1328 local_sha: local.local_sha,
1329 pre_existing,
1330 bundle_path,
1331 zip_artifacts: local.zip_artifacts,
1332 engine: config.engine,
1333 force,
1334 pre_existing_was_ancestor: local.pre_existing_was_ancestor,
1335 local_spec,
1336 hidden_bundles,
1337 _temp_dir: temp_dir,
1338 })))
1339}
1340
1341/// Execute one push or delete: prepare, lock, do work, release.
1342///
1343/// Both the upload path ([`perform_push_under_lock`]) and the delete
1344/// path ([`delete_remote_ref_under_lock`]) run inside the SAME per-ref
1345/// lock window with identical acquire/release accounting:
1346///
1347/// - `acquire_lock` returning `None` → emit the standard
1348/// "failed to acquire ref lock" wire error without doing work.
1349/// - `release_lock` failing AFTER successful work → downgrade the
1350/// outcome to an `Error` so the operator is alerted to a potentially
1351/// dangling lock.
1352/// - A genuine work error takes priority over a release failure — the
1353/// release error is logged but never masks the original failure.
1354///
1355/// Issue #133: putting the delete path under this same window closes
1356/// the race where a concurrent push could land a new bundle between a
1357/// pre-lock listing and a pre-lock sweep, producing a silent false
1358/// success.
1359async fn push_one(
1360 store: Arc<dyn ObjectStore>,
1361 prefix: Option<&str>,
1362 repo_dir: &Path,
1363 config: &PushConfig,
1364 now: OffsetDateTime,
1365 spec: PushSpec,
1366) -> Result<PushOutcome, PushError> {
1367 let (remote_ref_str, work): (String, UnderLockWork) =
1368 match prepare_push(store.as_ref(), prefix, repo_dir, config, spec).await? {
1369 PrepareOutcome::Done(o) => return Ok(o),
1370 PrepareOutcome::Ready(state) => (
1371 state.remote_ref.as_str().to_owned(),
1372 // `state` is already `Box<PushReadyState>` (`PrepareOutcome::Ready`
1373 // is boxed to keep the enum's largest variant compact); reuse the
1374 // existing allocation instead of re-boxing.
1375 UnderLockWork::Push(state),
1376 ),
1377 PrepareOutcome::Delete { remote_ref, zip } => (
1378 remote_ref.as_str().to_owned(),
1379 UnderLockWork::Delete { remote_ref, zip },
1380 ),
1381 };
1382
1383 let lock = match &work {
1384 UnderLockWork::Push(state) => lock_key(prefix, &state.remote_ref),
1385 UnderLockWork::Delete { remote_ref, .. } => lock_key(prefix, remote_ref),
1386 };
1387
1388 let Some(guard) = acquire_lock(Arc::clone(&store), &lock, config.ttl, now).await? else {
1389 return Ok(PushOutcome::Error {
1390 remote_ref: remote_ref_str,
1391 message: format!(
1392 // `push_one` covers BOTH the Push and Delete arms under the
1393 // same per-ref lock, so the contention message names both
1394 // — mirroring the packchain delete path's wording. A
1395 // delete-arm caller previously saw a misleading
1396 // "may be pushing" hint here.
1397 r#""failed to acquire ref lock at {lock}. Another client may be pushing or deleting. If this persists beyond {}s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
1398 config.ttl.whole_seconds(),
1399 ),
1400 });
1401 };
1402
1403 let result = match work {
1404 UnderLockWork::Push(state) => {
1405 perform_push_under_lock(store.as_ref(), prefix, config.kind, *state).await
1406 }
1407 UnderLockWork::Delete { remote_ref, zip } => {
1408 delete_remote_ref_under_lock(store.as_ref(), prefix, &remote_ref, zip, &lock).await
1409 }
1410 };
1411 let release_result = release_lock(guard).await;
1412
1413 match (&result, release_result) {
1414 (Ok(PushOutcome::Ok { .. }), Err(e)) => {
1415 warn!(key = %lock, error = %e, "failed to release lock");
1416 Ok(PushOutcome::Error {
1417 remote_ref: remote_ref_str,
1418 message: format!(
1419 r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
1420 ),
1421 })
1422 }
1423 (_, Err(e)) => {
1424 warn!(key = %lock, error = %e, "lock release failed (work already errored)");
1425 result
1426 }
1427 _ => result,
1428 }
1429}
1430
1431/// The work to perform inside the per-ref lock acquired by [`push_one`].
1432///
1433/// `PushReadyState` is significantly larger than the `Delete` variant
1434/// (paths, the temp dir guard, the captured refspec); boxing it keeps
1435/// the [`UnderLockWork`] discriminant compact regardless of variant.
1436enum UnderLockWork {
1437 Push(Box<PushReadyState>),
1438 Delete { remote_ref: RefName, zip: bool },
1439}
1440
1441/// Re-list under the lock, upload the bundle, init HEAD, write the `FORMAT`
1442/// key, defer the previous bundle's delete via a baseline tombstone (issue
1443/// #157), optionally upload `repo.zip`. Split out so the lock release in
1444/// the caller is unconditional.
1445async fn perform_push_under_lock(
1446 store: &dyn ObjectStore,
1447 prefix: Option<&str>,
1448 kind: BackendKind,
1449 state: PushReadyState,
1450) -> Result<PushOutcome, PushError> {
1451 let PushReadyState {
1452 remote_ref,
1453 local_sha,
1454 pre_existing,
1455 bundle_path,
1456 zip_artifacts,
1457 engine,
1458 force,
1459 pre_existing_was_ancestor,
1460 local_spec,
1461 hidden_bundles,
1462 _temp_dir,
1463 } = state;
1464 let remote_ref_str = remote_ref.as_str().to_owned();
1465
1466 // Issue #129: under-lock force-push protection check. A pre-lock
1467 // check would race against a concurrent `protect`; running here —
1468 // after `acquire_lock`, before any writes — closes that TOCTOU
1469 // window. The historical "protected ref + force" semantic is
1470 // "demote to non-force": if the local SHA would have been a
1471 // fast-forward against the pre-lock-listed remote (already pinned
1472 // by the stale-remote guard below), let the push through; if not,
1473 // emit the same NotAncestor wire error a pre-lock non-force probe
1474 // would have produced. The `is_protected` helper uses `head`, not
1475 // `list`, so this adds one cheap key probe and does not duplicate
1476 // any existing under-lock listing.
1477 if force && !pre_existing_was_ancestor && is_protected(store, prefix, &remote_ref).await? {
1478 return Ok(PushOutcome::Error {
1479 remote_ref: remote_ref_str,
1480 message: not_ancestor_wire_message(&local_spec),
1481 });
1482 }
1483
1484 // Issue #165: reuse the tombstone set captured in `prepare_push`.
1485 // The per-ref lock prevents any new tombstone for this ref from
1486 // landing between then and now.
1487 let current = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
1488 if current.len() > 1 {
1489 return Ok(PushOutcome::Error {
1490 remote_ref: remote_ref_str,
1491 message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
1492 });
1493 }
1494 let current_key = current.into_iter().next().map(|m| m.key);
1495 // Compare the pre-lock snapshot to the under-lock reality. All five
1496 // (pre / under) cases must agree:
1497 // None / None — happy path, no bundle existed before or now.
1498 // Some(K) / Some(K) — happy path, same bundle (e.g. force-push that
1499 // re-uploads against the same SHA).
1500 // Some(A) / Some(B) — concurrent push replaced our snapshot's
1501 // bundle. Reject: we'd silently overwrite it.
1502 // None / Some — concurrent push created a bundle after our
1503 // pre-lock list. Reject.
1504 // Some / None — concurrent delete (`git push :ref`) removed
1505 // our snapshot's bundle. Reject.
1506 // Without this guard a concurrent writer between the pre-lock list and
1507 // the lock acquisition would be silently overwritten.
1508 if pre_existing.as_deref() != current_key.as_deref() {
1509 return Ok(PushOutcome::Error {
1510 remote_ref: remote_ref_str,
1511 message: r#""stale remote. Please fetch and retry."?"#.to_owned(),
1512 });
1513 }
1514
1515 let bundle_dest = keys::bundle_key(prefix, &remote_ref, local_sha);
1516 // Stat once for "X / total" formatting in progress logs. The
1517 // multipart upload re-stats `bundle_path` to size the part plan;
1518 // a brief race here would surface as the log showing a stale
1519 // total — never as wrong bytes — so we tolerate it. On stat
1520 // failure (vanishingly rare on a tempdir we just wrote) fall
1521 // back to "unknown" rather than aborting the push.
1522 let bundle_total = tokio::fs::metadata(&bundle_path)
1523 .await
1524 .map(|m| m.len())
1525 .ok();
1526 let bundle_opts = PutOpts {
1527 progress: Some(bundle_progress_sink(&bundle_dest, bundle_total)),
1528 ..PutOpts::default()
1529 };
1530 store
1531 .put_path(&bundle_dest, &bundle_path, bundle_opts)
1532 .await?;
1533
1534 // HEAD bootstrap: write only if absent. Single round-trip via
1535 // put_if_absent — we don't care about the boolean (existing HEAD is
1536 // intentionally preserved).
1537 let head = head_key(prefix);
1538 store
1539 .put_if_absent(
1540 &head,
1541 Bytes::copy_from_slice(remote_ref.as_str().as_bytes()),
1542 )
1543 .await?;
1544
1545 // Lock in the storage engine on the first push. `put_if_absent` makes
1546 // concurrent first-push races safe — both write the same `bundle` value,
1547 // so the one that loses is a no-op. The boolean result is intentionally
1548 // ignored: an existing FORMAT key was already validated at connect time by
1549 // `backend::build`.
1550 let format_key = keys::join(prefix, "FORMAT");
1551 store
1552 .put_if_absent(&format_key, Bytes::from_static(engine.as_str().as_bytes()))
1553 .await?;
1554
1555 if let Some(prev) = current_key
1556 && prev != bundle_dest
1557 {
1558 // Issue #157: defer the prior-bundle delete to `gc sweep` via a
1559 // baseline tombstone instead of deleting it synchronously. A
1560 // concurrent fetcher that already advertised the prior SHA via
1561 // `list` would otherwise race the GET against this DELETE; the
1562 // tombstone gives `fetch_one` the grace window it needs.
1563 defer_prior_bundle_via_tombstone(store, prefix, &remote_ref, &prev, local_sha).await;
1564 }
1565
1566 if let Some(artifacts) = zip_artifacts {
1567 let zip_dest = archive_key(prefix, &remote_ref);
1568 let zip_total = tokio::fs::metadata(&artifacts.archive_path)
1569 .await
1570 .map(|m| m.len())
1571 .ok();
1572 // Issue #161: the `codepipeline-artifact-revision-summary` user
1573 // metadata is only meaningful on S3 (AWS CodePipeline consumes it
1574 // as `x-amz-meta-codepipeline-artifact-revision-summary`). Azure
1575 // metadata names must be valid C# identifiers (no hyphens), so
1576 // attaching the header on the Azure path causes the entire blob
1577 // upload to fail with `InvalidMetadata` — and the issue #127
1578 // best-effort swallow then hides the failure, leaving every
1579 // `?zip=1` push silently missing `repo.zip`. Omit the header
1580 // outside S3 so the zip artifact lands successfully.
1581 let user_metadata = match kind {
1582 BackendKind::S3 => vec![(
1583 "codepipeline-artifact-revision-summary".to_owned(),
1584 sanitize_metadata_value(&artifacts.commit_msg),
1585 )],
1586 BackendKind::Azure => Vec::new(),
1587 };
1588 let opts = PutOpts {
1589 content_disposition: Some(format!(
1590 "attachment; filename=repo-{}.zip",
1591 artifacts.short_sha
1592 )),
1593 user_metadata,
1594 progress: Some(bundle_progress_sink(&zip_dest, zip_total)),
1595 };
1596 upload_zip_artifact_best_effort(
1597 store,
1598 &remote_ref,
1599 &zip_dest,
1600 &artifacts.archive_path,
1601 opts,
1602 )
1603 .await;
1604 }
1605
1606 Ok(PushOutcome::Ok {
1607 remote_ref: remote_ref_str,
1608 })
1609}
1610
1611/// Build a [`ProgressSink`] that emits one structured `tracing::info!`
1612/// line per chunk transferred during a bundle / zip-archive upload.
1613///
1614/// Issue #55. Git's helper protocol has no upload-progress channel
1615/// (the helper-protocol stdout is reserved for protocol traffic per
1616/// `.claude/rules/protocol-stdout.md`), so the bundle path's only way
1617/// to inform the user during a multi-GiB upload is `tracing::info!`,
1618/// which routes to stderr via the tracing-subscriber initialised in
1619/// `main()`. The LFS path keeps its own sink wiring (one
1620/// progress-event JSON line per chunk on stdout, governed by the LFS
1621/// custom-transfer protocol).
1622///
1623/// `total` is the bundle size at the moment we stat'd it. A short
1624/// race against a writer that re-stats during multipart planning
1625/// would only mis-format the log line — it cannot drive wrong-byte
1626/// behaviour. `None` renders "unknown" so the call site can
1627/// gracefully degrade when stat'ing the source fails.
1628///
1629/// The granularity of events is whatever the backend's `put_path`
1630/// provides: one event per completed multipart part / staged block
1631/// for bodies above [`crate::object_store::multipart::MULTIPART_PUT_THRESHOLD`],
1632/// one event total for bodies below it. With the default 16 MiB
1633/// part size and 8-way concurrency, a 1 GiB bundle emits ~64 lines
1634/// — ample motion to spot a stall, far short of log spam.
1635pub(crate) fn bundle_progress_sink(key: &str, total: Option<u64>) -> ProgressSink {
1636 // The closure needs an owned `String` because `ProgressSink` is
1637 // `'static`; cloning here keeps the call sites' borrow intact so
1638 // they can pass `&bundle_dest` to `put_path` without juggling
1639 // ownership.
1640 let key = key.to_owned();
1641 let bytes_so_far = Arc::new(AtomicU64::new(0));
1642 ProgressSink::new(move |bytes_amount| {
1643 // `Ordering::Relaxed` is enough: we're not synchronising with
1644 // any other state, just maintaining a monotonic counter for
1645 // log lines. The worst a re-ordered store could do is print
1646 // an out-of-order count, which `tracing` already disclaims.
1647 let so_far = bytes_so_far
1648 .fetch_add(bytes_amount, Ordering::Relaxed)
1649 .saturating_add(bytes_amount);
1650 // Render `total` as a Display value so the field is omitted
1651 // when stat'ing the source failed at the call site. Tracing's
1652 // `Option<u64>` rendering would print "None" — uglier than a
1653 // bare absence of the field.
1654 if let Some(t) = total {
1655 info!(
1656 key = %key,
1657 bytes_so_far = so_far,
1658 total = t,
1659 bytes_chunk = bytes_amount,
1660 "uploading"
1661 );
1662 } else {
1663 info!(
1664 key = %key,
1665 bytes_so_far = so_far,
1666 bytes_chunk = bytes_amount,
1667 "uploading"
1668 );
1669 }
1670 })
1671}
1672
1673/// Replace ASCII control characters in `s` with spaces so the result
1674/// is safe to use as an HTTP header value.
1675///
1676/// `commit_msg` flows from `git::last_commit_message` (whose summary
1677/// is "everything before the first blank line" per gix) into the
1678/// `codepipeline-artifact-revision-summary` user-metadata header on
1679/// the zip-archive upload. A maliciously-crafted commit could embed
1680/// `\r\n` in the summary, which would be a CRLF injection on
1681/// transport — splitting one logical header into two and letting an
1682/// attacker forge arbitrary user-metadata headers on the uploaded
1683/// archive. Both backends' SDKs reject CRLF at the transport layer
1684/// today, but that defense is version-dependent and the resulting
1685/// error is a cryptic "invalid header" 400; sanitising here surfaces
1686/// a clean, predictable value at the call site instead.
1687fn sanitize_metadata_value(s: &str) -> String {
1688 s.chars()
1689 .map(|c| if c.is_control() { ' ' } else { c })
1690 .collect()
1691}
1692
1693/// Expected key count for a non-zip ref: one bundle object.
1694const DELETE_EXPECTED_NO_ZIP: usize = 1;
1695/// Expected key count for a zip ref: one bundle + one archive object.
1696const DELETE_EXPECTED_WITH_ZIP: usize = 2;
1697
1698/// Handle a delete refspec (`:<remote_ref>`) UNDER the per-ref lock
1699/// acquired by [`push_one`]: list `<prefix>/<ref>/`, expect 1 (or 2
1700/// with zip) keys after filtering out the lock we hold, delete them
1701/// all, emit `ok` or the appropriate error.
1702///
1703/// Issue #133: this must run inside the lock window. A pre-lock
1704/// listing-then-sweep races a concurrent push that lands a new bundle
1705/// between the listing and the deletion, producing a silent false
1706/// success — the delete reports `ok` to git while the ref survives
1707/// with a different bundle on the server.
1708///
1709/// The lock key (`<prefix>/<ref>/LOCK#.lock`) is filtered from the
1710/// listing — `release_lock` removes it last, after this function
1711/// returns. Apart from that one filter, the listing is unfiltered and
1712/// counts `PROTECTED#` and `repo.zip` against the expected total.
1713///
1714/// Issue #128: the `PROTECTED#` marker check is the FIRST guard, run
1715/// against the fresh under-lock listing BEFORE any count-vs-expected
1716/// dispatch. The pre-#128 ordering only consulted the marker in the
1717/// `else if` mismatch branch, so a count-matching listing (e.g.
1718/// `[bundle.bundle, PROTECTED#]` in zip mode, or `[PROTECTED#]` alone
1719/// in non-zip mode) would sweep the marker and report `ok`. With the
1720/// guard at the top, that bypass is closed.
1721///
1722/// Four behaviours fall out:
1723///
1724/// 1. **Protected ref** — listing (lock filtered out) includes a key
1725/// whose final segment is the [`keys::PROTECTED_MARKER_SEGMENT`].
1726/// Emit a protection-specific refusal naming the management CLI's
1727/// `unprotect` workflow.
1728/// 2. **Count matches `expected`, no marker** — sweep the entries and
1729/// report `ok`.
1730/// 3. **No bundle present** — listing (lock filtered out) is empty.
1731/// Returns the `"not found"?` wire error. This now includes the case
1732/// of a ref whose only on-server state was a stale `LOCK#.lock`:
1733/// `acquire_lock` recovers the stale lock, the post-lock listing
1734/// contains only our newly-held lock, and the filter renders it
1735/// empty.
1736/// 4. **Genuine multi-bundle / corruption** — count exceeds `expected`
1737/// and no marker is present. Fall through to the doctor message.
1738/// Example listing:
1739/// `[ "<prefix>/refs/heads/main/<sha-a>.bundle",
1740/// "<prefix>/refs/heads/main/<sha-b>.bundle" ]`.
1741async fn delete_remote_ref_under_lock(
1742 store: &dyn ObjectStore,
1743 prefix: Option<&str>,
1744 remote_ref: &RefName,
1745 zip: bool,
1746 lock_key: &str,
1747) -> Result<PushOutcome, PushError> {
1748 let listing = ref_listing_prefix(prefix, remote_ref);
1749 let all_entries = store.list(&listing).await?;
1750 // Issue #133: filter out the lock key we hold. `release_lock`
1751 // removes it last; the sweep below must not touch it (deleting our
1752 // own lock mid-critical-section would let concurrent clients
1753 // acquire it). The remaining count is what the protocol's
1754 // count-vs-expected dispatch operates on.
1755 let entries: Vec<&ObjectMeta> = all_entries.iter().filter(|e| e.key != lock_key).collect();
1756 let expected = if zip {
1757 DELETE_EXPECTED_WITH_ZIP
1758 } else {
1759 DELETE_EXPECTED_NO_ZIP
1760 };
1761 let remote_ref_str = remote_ref.as_str().to_owned();
1762 // Issue #128: the canonical protection guard. Run it FIRST against
1763 // the fresh, under-lock listing, BEFORE the count-match deletion
1764 // branch. The pre-#128 ordering only checked for the marker in the
1765 // `else if` mismatch branch, so a count-matching listing (e.g.
1766 // `[bundle.bundle, PROTECTED#]` with `expected = 2` in zip mode, or
1767 // `[PROTECTED#]` alone with `expected = 1`) would sweep the marker
1768 // and complete the delete silently. `entries_have_protected_marker`
1769 // matches only the literal `PROTECTED#` last segment — never the
1770 // `LOCK#.lock` lock key — so scanning the unfiltered `all_entries`
1771 // here is safe and avoids re-deriving a filtered view solely for
1772 // this check.
1773 if keys::entries_have_protected_marker(&all_entries) {
1774 return Ok(PushOutcome::Error {
1775 remote_ref: remote_ref_str,
1776 message: DELETE_PROTECTION_MESSAGE.to_owned(),
1777 });
1778 }
1779 if entries.len() == expected {
1780 for entry in &entries {
1781 delete_idempotent(store, &entry.key).await?;
1782 }
1783 // Issue #151 defence-in-depth: confirm no `PROTECTED#` marker
1784 // sneaked in. The lock window is still open (the caller releases
1785 // it after we return), so a `protect`/`unprotect` racing this
1786 // delete would be blocked on the lock per #159. Finding a marker
1787 // here would indicate a contract violation; the helper logs at
1788 // `error!` and the delete still reports `ok` — see the helper
1789 // doc for the rationale.
1790 verify_no_orphan_protected_after_delete(store, prefix, remote_ref).await;
1791 Ok(PushOutcome::Ok {
1792 remote_ref: remote_ref_str,
1793 })
1794 } else if entries.is_empty() {
1795 Ok(PushOutcome::Error {
1796 remote_ref: remote_ref_str,
1797 message: r#""not found"?"#.to_owned(),
1798 })
1799 } else {
1800 // Genuine multi-bundle / corruption: `entries` has more than
1801 // `expected` items and no `PROTECTED#` marker (that case
1802 // returned at the top guard above). Fall through to the doctor
1803 // message — see issue #128 for the routing rationale.
1804 Ok(PushOutcome::Error {
1805 remote_ref: remote_ref_str,
1806 message:
1807 r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
1808 .to_owned(),
1809 })
1810 }
1811}
1812
1813#[cfg(test)]
1814mod tests {
1815 use super::*;
1816 use crate::object_store::mock::MockStore;
1817 use crate::packchain::gc::baseline_tombstone_listing_prefix;
1818
1819 const SHA: &str = "0123456789abcdef0123456789abcdef01234567";
1820 /// A second 40-char hex SHA distinct from `SHA`. Used by tests that
1821 /// need to seed pre-existing state under a SHA different from
1822 /// `local_sha` so a regression cannot pass through coincidental key
1823 /// alignment between `pre_existing` and `bundle_dest`.
1824 const OTHER_SHA: &str = "ffffffffffffffffffffffffffffffffffffffff";
1825
1826 /// Compile-time guard: replaces the runtime `assert_ne!(other_sha, SHA, …)`
1827 /// that the constant lift removed. A typo making the two consts equal
1828 /// fails the build rather than silently letting a stale-remote test
1829 /// pass for the wrong reason.
1830 const _: () = {
1831 let a = SHA.as_bytes();
1832 let b = OTHER_SHA.as_bytes();
1833 let mut i = 0;
1834 let mut differs = a.len() != b.len();
1835 while i < a.len() && i < b.len() {
1836 if a[i] != b[i] {
1837 differs = true;
1838 }
1839 i += 1;
1840 }
1841 assert!(differs, "OTHER_SHA must differ from SHA");
1842 };
1843
1844 fn rn(s: &str) -> RefName {
1845 RefName::new(s).expect("RefName")
1846 }
1847
1848 // --- parse_push_args ----------------------------------------------
1849
1850 #[test]
1851 fn parse_push_args_accepts_canonical_form() {
1852 let spec = parse_push_args("refs/heads/main:refs/heads/main").expect("parse");
1853 assert!(!spec.force);
1854 assert_eq!(spec.local_spec, "refs/heads/main");
1855 assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1856 }
1857
1858 #[test]
1859 fn parse_push_args_accepts_force_flag() {
1860 let spec = parse_push_args("+refs/heads/main:refs/heads/main").expect("parse");
1861 assert!(spec.force);
1862 assert_eq!(spec.local_spec, "refs/heads/main");
1863 }
1864
1865 #[test]
1866 fn parse_push_args_accepts_delete_form() {
1867 let spec = parse_push_args(":refs/heads/main").expect("parse");
1868 assert!(!spec.force);
1869 assert!(spec.local_spec.is_empty());
1870 assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1871 }
1872
1873 #[test]
1874 fn parse_push_args_accepts_force_delete_form() {
1875 let spec = parse_push_args("+:refs/heads/main").expect("parse");
1876 assert!(spec.force);
1877 assert!(spec.local_spec.is_empty());
1878 assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
1879 }
1880
1881 #[test]
1882 fn parse_push_args_accepts_short_local() {
1883 let spec = parse_push_args("HEAD:refs/heads/main").expect("parse");
1884 assert_eq!(spec.local_spec, "HEAD");
1885 }
1886
1887 #[test]
1888 fn parse_push_args_rejects_missing_colon() {
1889 assert!(matches!(
1890 parse_push_args("refs/heads/main"),
1891 Err(PushError::Parse { .. })
1892 ));
1893 }
1894
1895 #[test]
1896 fn parse_push_args_rejects_empty_remote() {
1897 assert!(matches!(
1898 parse_push_args("refs/heads/main:"),
1899 Err(PushError::Parse { .. })
1900 ));
1901 }
1902
1903 #[test]
1904 fn parse_push_args_rejects_invalid_remote_ref() {
1905 assert!(matches!(
1906 parse_push_args("refs/heads/main:refs/heads/.bad"),
1907 Err(PushError::RemoteRef(_))
1908 ));
1909 }
1910
1911 #[test]
1912 fn parse_push_args_rejects_invalid_local_spec() {
1913 assert!(matches!(
1914 parse_push_args("refs/heads/.bad:refs/heads/main"),
1915 Err(PushError::InvalidLocalSpec(_))
1916 ));
1917 }
1918
1919 #[test]
1920 fn parse_push_args_rejects_embedded_whitespace() {
1921 assert!(matches!(
1922 parse_push_args("refs/heads/main:refs/heads/main extra"),
1923 Err(PushError::Parse { .. })
1924 ));
1925 }
1926
1927 #[test]
1928 fn parse_push_args_rejects_empty_input() {
1929 assert!(matches!(parse_push_args(""), Err(PushError::Parse { .. })));
1930 }
1931
1932 // --- key formatting -----------------------------------------------
1933
1934 #[test]
1935 fn key_formatters_with_prefix() {
1936 let r = rn("refs/heads/main");
1937 let sha = Sha::from_hex(SHA).unwrap();
1938 assert_eq!(
1939 keys::bundle_key(Some("repo"), &r, sha),
1940 format!("repo/refs/heads/main/{SHA}.bundle"),
1941 );
1942 assert_eq!(
1943 lock_key(Some("repo"), &r),
1944 "repo/refs/heads/main/LOCK#.lock"
1945 );
1946 assert_eq!(
1947 archive_key(Some("repo"), &r),
1948 "repo/refs/heads/main/repo.zip"
1949 );
1950 assert_eq!(head_key(Some("repo")), "repo/HEAD");
1951 }
1952
1953 #[test]
1954 fn key_formatters_with_no_prefix() {
1955 let r = rn("refs/heads/main");
1956 let sha = Sha::from_hex(SHA).unwrap();
1957 assert_eq!(
1958 keys::bundle_key(None, &r, sha),
1959 format!("refs/heads/main/{SHA}.bundle"),
1960 );
1961 assert_eq!(lock_key(None, &r), "refs/heads/main/LOCK#.lock");
1962 assert_eq!(archive_key(None, &r), "refs/heads/main/repo.zip");
1963 assert_eq!(head_key(None), "HEAD");
1964 // Empty-string prefix is treated identically to None.
1965 assert_eq!(head_key(Some("")), "HEAD");
1966 assert_eq!(lock_key(Some(""), &r), "refs/heads/main/LOCK#.lock");
1967 }
1968
1969 // --- bundle filter ------------------------------------------------
1970
1971 #[test]
1972 fn is_bundle_candidate_keeps_real_bundles() {
1973 assert!(is_bundle_candidate(&format!(
1974 "repo/refs/heads/main/{SHA}.bundle"
1975 )));
1976 assert!(is_bundle_candidate(&format!(
1977 "refs/heads/main/{SHA}.bundle"
1978 )));
1979 }
1980
1981 #[test]
1982 fn is_bundle_candidate_rejects_protected_zip_lock() {
1983 assert!(!is_bundle_candidate("repo/refs/heads/main/PROTECTED#"));
1984 assert!(!is_bundle_candidate("repo/refs/heads/main/repo.zip"));
1985 assert!(!is_bundle_candidate("repo/refs/heads/main/LOCK#.lock"));
1986 assert!(!is_bundle_candidate("repo/refs/heads/main/file.lock"));
1987 assert!(!is_bundle_candidate("repo/refs/heads/main/LOCKS/x"));
1988 }
1989
1990 /// Regression: refs whose names embed `.zip` as a substring must
1991 /// not be filtered out. Previously the predicate rejected any key
1992 /// containing `.zip` anywhere in the byte sequence.
1993 #[test]
1994 fn is_bundle_candidate_keeps_refs_containing_zip_substring() {
1995 assert!(is_bundle_candidate(&format!(
1996 "repo/refs/heads/v1.zip-rc1/{SHA}.bundle"
1997 )));
1998 assert!(is_bundle_candidate(&format!(
1999 "refs/heads/myrelease.zip-v1/{SHA}.bundle"
2000 )));
2001 }
2002
2003 /// Regression: refs whose names embed `LOCKS` as a substring must
2004 /// not be filtered out. Previously the predicate rejected any key
2005 /// containing `/LOCKS/` anywhere in the byte sequence.
2006 #[test]
2007 fn is_bundle_candidate_keeps_refs_containing_locks_substring() {
2008 assert!(is_bundle_candidate(&format!(
2009 "repo/refs/heads/LOCKS-feature/x/{SHA}.bundle"
2010 )));
2011 assert!(is_bundle_candidate(&format!(
2012 "refs/heads/LOCKS/sub/{SHA}.bundle"
2013 )));
2014 }
2015
2016 /// Regression: a ref-name segment ending in `.lock` is permitted by
2017 /// `gix_validate`; bundle keys under such refs must still match.
2018 /// The unwanted `.lock` sibling is `<ref>/LOCK#.lock`, where the
2019 /// final segment is `LOCK#.lock`, not `<sha>.bundle`.
2020 #[test]
2021 fn is_bundle_candidate_keeps_refs_containing_lock_substring() {
2022 assert!(is_bundle_candidate(&format!(
2023 "refs/heads/feature.lock-rc/{SHA}.bundle"
2024 )));
2025 }
2026
2027 // --- parse_remote_sha_from_key ------------------------------------
2028
2029 #[test]
2030 fn parse_remote_sha_from_key_extracts_lower_hex_40() {
2031 let sha = parse_remote_sha_from_key(&format!("repo/refs/heads/main/{SHA}.bundle"))
2032 .expect("parse");
2033 assert_eq!(sha.to_string(), SHA);
2034 }
2035
2036 #[test]
2037 fn parse_remote_sha_from_key_rejects_uppercase() {
2038 let upper = SHA.to_uppercase();
2039 assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{upper}.bundle")).is_none());
2040 }
2041
2042 #[test]
2043 fn parse_remote_sha_from_key_rejects_wrong_length() {
2044 let short = &SHA[..39];
2045 assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{short}.bundle")).is_none());
2046 }
2047
2048 #[test]
2049 fn parse_remote_sha_from_key_rejects_missing_extension() {
2050 assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{SHA}")).is_none());
2051 }
2052
2053 // --- bundles_for_ref / is_protected ------------------------------
2054
2055 #[tokio::test]
2056 async fn bundles_for_ref_filters_protected_zip_lock() {
2057 let store = MockStore::new();
2058 let r = rn("refs/heads/main");
2059 store.insert(
2060 format!("repo/refs/heads/main/{SHA}.bundle"),
2061 Bytes::from_static(b"b"),
2062 );
2063 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2064 store.insert("repo/refs/heads/main/repo.zip", Bytes::from_static(b""));
2065 store.insert("repo/refs/heads/main/LOCK#.lock", Bytes::from_static(b""));
2066 let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2067 .await
2068 .unwrap();
2069 assert_eq!(bundles.len(), 1);
2070 assert!(bundles[0].key.ends_with(".bundle"));
2071 }
2072
2073 /// Regression for #109: a ref whose name contains `.zip` must
2074 /// not have its bundle silently filtered out.
2075 #[tokio::test]
2076 async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_zip() {
2077 let store = MockStore::new();
2078 let r = rn("refs/heads/v1.zip-rc1");
2079 let bundle_key = format!("repo/refs/heads/v1.zip-rc1/{SHA}.bundle");
2080 store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
2081 let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2082 .await
2083 .unwrap();
2084 assert_eq!(bundles.len(), 1);
2085 assert_eq!(bundles[0].key, bundle_key);
2086 }
2087
2088 /// Regression for #109: a ref whose name contains `LOCKS` must
2089 /// not have its bundle silently filtered out.
2090 #[tokio::test]
2091 async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_locks() {
2092 let store = MockStore::new();
2093 let r = rn("refs/heads/LOCKS-feature/x");
2094 let bundle_key = format!("repo/refs/heads/LOCKS-feature/x/{SHA}.bundle");
2095 store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
2096 let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
2097 .await
2098 .unwrap();
2099 assert_eq!(bundles.len(), 1);
2100 assert_eq!(bundles[0].key, bundle_key);
2101 }
2102
2103 /// Issue #165: a caller-supplied `cached_hidden` set must satisfy
2104 /// the tombstone lookup — `bundles_for_ref` must NOT re-list
2105 /// `<prefix>/gc/` or fetch any tombstone body. Counts every `list`
2106 /// + `get_bytes` call so a regression that drops the cache and
2107 /// re-walks the tombstone set fails this test.
2108 #[tokio::test]
2109 async fn bundles_for_ref_skips_tombstone_lookup_when_cache_provided() {
2110 use std::sync::atomic::{AtomicUsize, Ordering};
2111
2112 struct CountingStore {
2113 inner: MockStore,
2114 gc_lists: AtomicUsize,
2115 tombstone_gets: AtomicUsize,
2116 }
2117
2118 // `put_path` is intentionally omitted from the forward list to
2119 // preserve the original behavior (trait default forwarding via
2120 // `Self::put_bytes`); this decorator never has `put_path` called
2121 // on it in practice.
2122 crate::delegate_to_inner_impl! {
2123 impl ObjectStore for CountingStore {
2124 forward: get_to_file, get_bytes_range,
2125 put_bytes, put_if_absent,
2126 head, copy, delete;
2127
2128 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
2129 if prefix == "repo/gc/" {
2130 self.gc_lists.fetch_add(1, Ordering::SeqCst);
2131 }
2132 self.inner.list(prefix).await
2133 }
2134
2135 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2136 if key.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))) {
2137 self.tombstone_gets.fetch_add(1, Ordering::SeqCst);
2138 }
2139 self.inner.get_bytes(key).await
2140 }
2141 }
2142 }
2143
2144 let inner = MockStore::new();
2145 let r = rn("refs/heads/main");
2146 // Seed: a bundle for the ref plus a baseline tombstone naming a
2147 // *different* SHA. The tombstone exists so a non-cached call
2148 // would have to fetch & parse it to compute the hide set.
2149 inner.insert(
2150 format!("repo/refs/heads/main/{SHA}.bundle"),
2151 Bytes::from_static(b"b"),
2152 );
2153 // Plausible tombstone body — `tombstoned_bundle_keys` parses it
2154 // as `BaselineTombstone` JSON. Body shape mirrors the one
2155 // written by `write_baseline_tombstone_unconditional`; `v`
2156 // must be `TOMBSTONE_SCHEMA_VERSION` or the parser rejects it
2157 // and the tombstone is silently skipped (defeating the test).
2158 let tomb_body = format!(
2159 r#"{{"v":1,"ref_name":"refs/heads/main","sha":"{OTHER_SHA}","marked_at":"2024-01-01T00:00:00Z"}}"#
2160 );
2161 inner.insert(
2162 format!(
2163 "{}test.json",
2164 baseline_tombstone_listing_prefix(Some("repo"))
2165 ),
2166 Bytes::from(tomb_body),
2167 );
2168
2169 let store = CountingStore {
2170 inner,
2171 gc_lists: AtomicUsize::new(0),
2172 tombstone_gets: AtomicUsize::new(0),
2173 };
2174
2175 // Baseline: a `None` cache pays for one `gc/` list + one
2176 // tombstone GET. Asserting these counts pins the un-optimised
2177 // shape so the contrast with the cached path is meaningful.
2178 let bundles_uncached = bundles_for_ref(&store, Some("repo"), &r, None)
2179 .await
2180 .unwrap();
2181 assert_eq!(bundles_uncached.len(), 1);
2182 assert_eq!(store.gc_lists.load(Ordering::SeqCst), 1);
2183 assert_eq!(store.tombstone_gets.load(Ordering::SeqCst), 1);
2184
2185 // Cached path: hand the same hide set we already paid for.
2186 // `bundles_for_ref` must consult it and skip the `gc/` walk
2187 // entirely. A regression that drops the cache and re-lists
2188 // would bump these counters past 1.
2189 let hidden: HashSet<String> = [format!("repo/refs/heads/main/{OTHER_SHA}.bundle")]
2190 .into_iter()
2191 .collect();
2192 let bundles_cached = bundles_for_ref(&store, Some("repo"), &r, Some(&hidden))
2193 .await
2194 .unwrap();
2195 assert_eq!(bundles_cached.len(), 1);
2196 assert_eq!(
2197 store.gc_lists.load(Ordering::SeqCst),
2198 1,
2199 "cached call must not re-list gc/",
2200 );
2201 assert_eq!(
2202 store.tombstone_gets.load(Ordering::SeqCst),
2203 1,
2204 "cached call must not refetch any tombstone body",
2205 );
2206 }
2207
2208 #[tokio::test]
2209 async fn is_protected_detects_marker() {
2210 let store = MockStore::new();
2211 let r = rn("refs/heads/main");
2212 assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
2213 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2214 assert!(is_protected(&store, Some("repo"), &r).await.unwrap());
2215 }
2216
2217 /// Regression for #119: only the exact `PROTECTED#` key counts as a
2218 /// protection marker. A sibling key that merely starts with
2219 /// `PROTECTED#` (e.g. `PROTECTED#audit`) must not flip the result.
2220 #[tokio::test]
2221 async fn is_protected_ignores_protected_prefixed_sibling() {
2222 let store = MockStore::new();
2223 let r = rn("refs/heads/main");
2224 store.insert(
2225 "repo/refs/heads/main/PROTECTED#audit",
2226 Bytes::from_static(b""),
2227 );
2228 assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
2229 }
2230
2231 /// Regression for #119: `is_protected` must use `head`, not `list`.
2232 /// Arm `AccessDeniedOnAnyList`; if the implementation calls `list`
2233 /// at all, the fault fires and the call returns an error. We assert
2234 /// success and that the fault is still pending (i.e. unfired).
2235 #[tokio::test]
2236 async fn is_protected_uses_head_not_list() {
2237 use crate::object_store::mock::Fault;
2238 let store = MockStore::new();
2239 let r = rn("refs/heads/main");
2240 store.arm(Fault::AccessDeniedOnAnyList);
2241 let got = is_protected(&store, Some("repo"), &r).await.unwrap();
2242 assert!(!got);
2243 assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
2244 // And the positive case too — still no list.
2245 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
2246 let got = is_protected(&store, Some("repo"), &r).await.unwrap();
2247 assert!(got);
2248 assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
2249 }
2250
2251 // --- acquire_lock / release_lock ----------------------------------
2252 //
2253 // Issue #118: `acquire_lock` returns a `LockGuard` instead of a
2254 // plain bool because the lock now carries a background heartbeat
2255 // task. Tests construct an `Arc<dyn ObjectStore>` so the heartbeat
2256 // task can clone the store; `MockStore`'s internal state is
2257 // already `Arc<Mutex<...>>`-shared, so the `Arc` clone is shape
2258 // bookkeeping, not extra state.
2259
2260 #[tokio::test]
2261 async fn acquire_lock_succeeds_when_absent() {
2262 let store = Arc::new(MockStore::new());
2263 let now = OffsetDateTime::now_utc();
2264 let guard = acquire_lock(
2265 Arc::clone(&store) as Arc<dyn ObjectStore>,
2266 "k",
2267 Duration::seconds(60),
2268 now,
2269 )
2270 .await
2271 .unwrap();
2272 assert!(guard.is_some(), "expected a fresh guard");
2273 assert!(store.contains("k"));
2274 // Drop the guard so the heartbeat task exits before the
2275 // runtime tears down (also covered explicitly by
2276 // `lock_guard_drop_aborts_heartbeat`).
2277 drop(guard);
2278 }
2279
2280 #[tokio::test]
2281 async fn acquire_lock_returns_none_when_recently_held() {
2282 let store = Arc::new(MockStore::new());
2283 let now = OffsetDateTime::now_utc();
2284 store.insert_with("k", Bytes::new(), now, PutOpts::default());
2285 let guard = acquire_lock(
2286 Arc::clone(&store) as Arc<dyn ObjectStore>,
2287 "k",
2288 Duration::seconds(60),
2289 now,
2290 )
2291 .await
2292 .unwrap();
2293 assert!(guard.is_none(), "expected contention");
2294 }
2295
2296 #[tokio::test]
2297 async fn acquire_lock_recovers_stale_lock() {
2298 let store = Arc::new(MockStore::new());
2299 let now = OffsetDateTime::now_utc();
2300 let stale = now - Duration::seconds(120);
2301 store.insert_with("k", Bytes::new(), stale, PutOpts::default());
2302 let guard = acquire_lock(
2303 Arc::clone(&store) as Arc<dyn ObjectStore>,
2304 "k",
2305 Duration::seconds(60),
2306 now,
2307 )
2308 .await
2309 .unwrap();
2310 assert!(guard.is_some(), "stale lock must be recoverable");
2311 // Lock still exists (we re-created it with put_if_absent).
2312 assert!(store.contains("k"));
2313 drop(guard);
2314 }
2315
2316 #[tokio::test]
2317 async fn acquire_lock_treats_disappeared_lock_as_contention() {
2318 // First put_if_absent says "exists", but head returns NotFound
2319 // (race: another client released between the calls). We must
2320 // surface contention, not error.
2321 use crate::object_store::mock::Fault;
2322 let store = MockStore::new();
2323 store.insert("k", Bytes::new());
2324 store.arm(Fault::NotFoundOnHead { key: "k".into() });
2325 let arc = Arc::new(store);
2326 let now = OffsetDateTime::now_utc();
2327 let guard = acquire_lock(
2328 Arc::clone(&arc) as Arc<dyn ObjectStore>,
2329 "k",
2330 Duration::seconds(60),
2331 now,
2332 )
2333 .await
2334 .unwrap();
2335 assert!(guard.is_none(), "expected contention on disappeared lock");
2336 // Confirm head() was actually called — a regression that skipped
2337 // the staleness branch and returned None directly would also
2338 // satisfy the assertion above. The fault firing proves head ran.
2339 assert_eq!(arc.pending_faults(), 0);
2340 }
2341
2342 #[tokio::test]
2343 async fn release_lock_deletes_existing_key() {
2344 let store = Arc::new(MockStore::new());
2345 let now = OffsetDateTime::now_utc();
2346 let guard = acquire_lock(
2347 Arc::clone(&store) as Arc<dyn ObjectStore>,
2348 "k",
2349 Duration::seconds(60),
2350 now,
2351 )
2352 .await
2353 .unwrap()
2354 .expect("acquire_lock must succeed on an empty store");
2355 release_lock(guard).await.unwrap();
2356 assert!(!store.contains("k"));
2357 }
2358
2359 #[tokio::test]
2360 async fn release_lock_swallows_not_found_when_lock_already_gone() {
2361 // Acquire a lock, then delete the key out-of-band before
2362 // release_lock runs. The release must map NotFound → Ok(()).
2363 let store = Arc::new(MockStore::new());
2364 let now = OffsetDateTime::now_utc();
2365 let guard = acquire_lock(
2366 Arc::clone(&store) as Arc<dyn ObjectStore>,
2367 "k",
2368 Duration::seconds(60),
2369 now,
2370 )
2371 .await
2372 .unwrap()
2373 .expect("acquire_lock must succeed");
2374 // Cancel the heartbeat first so it cannot race the manual
2375 // delete and re-create the key.
2376 guard.heartbeat.as_ref().unwrap().abort();
2377 // Give the abort a chance to take effect, then remove the key.
2378 tokio::task::yield_now().await;
2379 let _ = store.delete("k").await;
2380 release_lock(guard).await.unwrap();
2381 }
2382
2383 #[tokio::test]
2384 async fn release_lock_propagates_non_not_found_errors() {
2385 use crate::object_store::mock::Fault;
2386 let store = Arc::new(MockStore::new());
2387 let now = OffsetDateTime::now_utc();
2388 let guard = acquire_lock(
2389 Arc::clone(&store) as Arc<dyn ObjectStore>,
2390 "k",
2391 Duration::seconds(60),
2392 now,
2393 )
2394 .await
2395 .unwrap()
2396 .expect("acquire_lock must succeed");
2397 store.arm(Fault::NetworkOnDelete { key: "k".into() });
2398 let err = release_lock(guard).await.unwrap_err();
2399 assert!(
2400 matches!(err, ObjectStoreError::Network(_)),
2401 "expected Network error, got {err:?}",
2402 );
2403 // The fault fired exactly once.
2404 assert_eq!(store.pending_faults(), 0);
2405 // Key remains because the delete was faulted, not executed.
2406 assert!(store.contains("k"));
2407 }
2408
2409 /// Issue #118: a long-running critical section must not lose its
2410 /// lock. The heartbeat refreshes `last_modified` faster than the
2411 /// TTL expires, so a concurrent acquire after the original TTL
2412 /// elapses still sees a live lock and returns `None` (contention).
2413 #[tokio::test(start_paused = true)]
2414 async fn heartbeat_keeps_lock_alive_past_ttl() {
2415 let store = Arc::new(MockStore::new());
2416 let now = OffsetDateTime::now_utc();
2417 // TTL is 4 s → heartbeat fires every 2 s (see
2418 // `heartbeat_interval`). Run the test for ~10 s of virtual
2419 // time so a regression that disabled the heartbeat would have
2420 // multiple TTLs to expire under.
2421 let ttl = Duration::seconds(4);
2422 let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2423 .await
2424 .unwrap()
2425 .expect("acquire must succeed");
2426
2427 // Advance the clock past several TTLs, letting the heartbeat
2428 // task fire each time.
2429 for _ in 0..5 {
2430 tokio::time::advance(std::time::Duration::from_secs(3)).await;
2431 // Yield so the spawned heartbeat task can take its turn
2432 // on the runtime and PUT the lock key.
2433 tokio::task::yield_now().await;
2434 }
2435
2436 // A concurrent acquire would see `last_modified` recent
2437 // (heartbeat just refreshed it), so it should report
2438 // contention — not steal the lock as stale. We use a `now`
2439 // far in the future (matches the wall-clock view a second
2440 // process would have) but rely on the heartbeat having
2441 // overwritten `last_modified` to the runtime "now".
2442 let future = OffsetDateTime::now_utc();
2443 let other = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
2444 .await
2445 .unwrap();
2446 assert!(
2447 other.is_none(),
2448 "live lock must not be stealable while the holder's heartbeat runs",
2449 );
2450
2451 release_lock(guard).await.unwrap();
2452 assert!(!store.contains("k"));
2453 }
2454
2455 /// Releasing the guard must stop the heartbeat so no further PUTs
2456 /// hit the lock key after release. We assert by deleting the key
2457 /// post-release and confirming it stays gone.
2458 #[tokio::test(start_paused = true)]
2459 async fn release_lock_stops_heartbeat() {
2460 let store = Arc::new(MockStore::new());
2461 let now = OffsetDateTime::now_utc();
2462 let ttl = Duration::seconds(4);
2463 let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2464 .await
2465 .unwrap()
2466 .expect("acquire must succeed");
2467 release_lock(guard).await.unwrap();
2468 assert!(!store.contains("k"));
2469
2470 // Advance well past multiple heartbeat intervals. A
2471 // regression that forgot to abort the task would re-create
2472 // the key via put_bytes.
2473 for _ in 0..5 {
2474 tokio::time::advance(std::time::Duration::from_secs(3)).await;
2475 tokio::task::yield_now().await;
2476 }
2477 assert!(
2478 !store.contains("k"),
2479 "heartbeat must not re-create the key after release",
2480 );
2481 }
2482
2483 /// Dropping the guard without calling `release_lock` aborts the
2484 /// heartbeat (so the lock becomes stealable after TTL) and leaves
2485 /// the lock key in place (a future caller's stale-recovery path
2486 /// reclaims it).
2487 #[tokio::test(start_paused = true)]
2488 async fn lock_guard_drop_aborts_heartbeat() {
2489 let store = Arc::new(MockStore::new());
2490 let now = OffsetDateTime::now_utc();
2491 let ttl = Duration::seconds(4);
2492 let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2493 .await
2494 .unwrap()
2495 .expect("acquire must succeed");
2496 drop(guard);
2497
2498 // Capture the lock's last_modified right after drop —
2499 // heartbeats after this point would advance it. `head` is the
2500 // trait-level path to last_modified and avoids a test-only
2501 // accessor on MockStore.
2502 let after_drop = store.head("k").await.expect("lock present").last_modified;
2503
2504 // Advance time past multiple heartbeat intervals.
2505 for _ in 0..5 {
2506 tokio::time::advance(std::time::Duration::from_secs(3)).await;
2507 tokio::task::yield_now().await;
2508 }
2509
2510 let after_advance = store
2511 .head("k")
2512 .await
2513 .expect("lock still present")
2514 .last_modified;
2515 assert_eq!(
2516 after_drop, after_advance,
2517 "heartbeat must not refresh last_modified after drop",
2518 );
2519
2520 // And the lock is now stealable via the stale path: an acquire
2521 // with a `now` past TTL deletes the orphaned lock and reclaims.
2522 let future = now + Duration::seconds(120);
2523 let recovered = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
2524 .await
2525 .unwrap();
2526 assert!(recovered.is_some(), "orphaned lock must be reclaimable");
2527 drop(recovered);
2528 }
2529
2530 /// Issue #150 regression: a heartbeat `put_bytes` already in
2531 /// flight when `release` is called must complete BEFORE the
2532 /// release issues its DELETE. The pre-fix code did
2533 /// `handle.abort(); delete`, which only cancelled the future at
2534 /// its next await point — an in-flight network PUT continued on
2535 /// the server and could settle AFTER the DELETE, resurrecting the
2536 /// lock key as an orphan.
2537 ///
2538 /// The test wraps the mock store in a barrier-gated `put_bytes`
2539 /// (the first PUT records its start, then waits on a notify) so
2540 /// the heartbeat tick lands in a state where the PUT has been
2541 /// issued but not yet completed when `release` runs. The
2542 /// post-condition is the sequence of recorded operations: the
2543 /// in-flight PUT's completion must precede the DELETE's start.
2544 /// Expected sequence is derived from the invariant in the
2545 /// `LockGuard::release` doc comment (the spec), not from the
2546 /// code's current output (lesson #5).
2547 // The test body inlines a small `ObjectStore` decorator
2548 // (`GatedPutStore`) so the trait wiring inflates the line count
2549 // past clippy's default budget. Splitting the decorator into a
2550 // sibling helper would obscure the test's single behaviour
2551 // contract, so we accept the lint locally.
2552 #[allow(clippy::too_many_lines)]
2553 #[tokio::test(start_paused = true)]
2554 async fn release_awaits_in_flight_heartbeat_put_before_delete() {
2555 use std::sync::Mutex;
2556 use tokio::sync::Notify;
2557
2558 /// Op-log entry recording the relative order of `put_bytes`
2559 /// and `delete` events. The release contract requires
2560 /// `PutEnd` to precede `DeleteStart`.
2561 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2562 enum Op {
2563 PutStart,
2564 PutEnd,
2565 DeleteStart,
2566 DeleteEnd,
2567 }
2568
2569 /// Decorator that gates the FIRST `put_bytes` on a notify
2570 /// barrier. Subsequent `put_bytes` calls pass through (they
2571 /// should not happen — once we hold the gate, release should
2572 /// stop the heartbeat before another tick fires).
2573 struct GatedPutStore {
2574 inner: Arc<MockStore>,
2575 put_gate: Arc<Notify>,
2576 log: Arc<Mutex<Vec<Op>>>,
2577 gated_key: String,
2578 gate_consumed: std::sync::atomic::AtomicBool,
2579 }
2580
2581 #[async_trait::async_trait]
2582 impl ObjectStore for GatedPutStore {
2583 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
2584 self.inner.list(prefix).await
2585 }
2586 async fn get_to_file(
2587 &self,
2588 key: &str,
2589 dest: &std::path::Path,
2590 opts: crate::object_store::GetOpts,
2591 ) -> Result<(), ObjectStoreError> {
2592 self.inner.get_to_file(key, dest, opts).await
2593 }
2594 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
2595 self.inner.get_bytes(key).await
2596 }
2597 async fn get_bytes_range(
2598 &self,
2599 key: &str,
2600 range: std::ops::Range<u64>,
2601 ) -> Result<Bytes, ObjectStoreError> {
2602 self.inner.get_bytes_range(key, range).await
2603 }
2604 async fn put_bytes(
2605 &self,
2606 key: &str,
2607 body: Bytes,
2608 opts: PutOpts,
2609 ) -> Result<(), ObjectStoreError> {
2610 let is_gated = key == self.gated_key
2611 && !self
2612 .gate_consumed
2613 .swap(true, std::sync::atomic::Ordering::SeqCst);
2614 if is_gated {
2615 self.log.lock().unwrap().push(Op::PutStart);
2616 self.put_gate.notified().await;
2617 }
2618 let result = self.inner.put_bytes(key, body, opts).await;
2619 if is_gated {
2620 self.log.lock().unwrap().push(Op::PutEnd);
2621 }
2622 result
2623 }
2624 async fn put_path(
2625 &self,
2626 key: &str,
2627 src: &std::path::Path,
2628 opts: PutOpts,
2629 ) -> Result<(), ObjectStoreError> {
2630 self.inner.put_path(key, src, opts).await
2631 }
2632 async fn put_if_absent(
2633 &self,
2634 key: &str,
2635 body: Bytes,
2636 ) -> Result<bool, ObjectStoreError> {
2637 self.inner.put_if_absent(key, body).await
2638 }
2639 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
2640 self.inner.head(key).await
2641 }
2642 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
2643 self.inner.copy(src, dst).await
2644 }
2645 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
2646 self.log.lock().unwrap().push(Op::DeleteStart);
2647 let result = self.inner.delete(key).await;
2648 self.log.lock().unwrap().push(Op::DeleteEnd);
2649 result
2650 }
2651 }
2652
2653 let inner = Arc::new(MockStore::new());
2654 let put_gate = Arc::new(Notify::new());
2655 let log = Arc::new(Mutex::new(Vec::<Op>::new()));
2656 let store = Arc::new(GatedPutStore {
2657 inner: Arc::clone(&inner),
2658 put_gate: Arc::clone(&put_gate),
2659 log: Arc::clone(&log),
2660 gated_key: "k".to_owned(),
2661 gate_consumed: std::sync::atomic::AtomicBool::new(false),
2662 });
2663
2664 let now = OffsetDateTime::now_utc();
2665 let ttl = Duration::seconds(4);
2666 let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
2667 .await
2668 .unwrap()
2669 .expect("acquire must succeed");
2670
2671 // Yield so the heartbeat task is polled and consumes its
2672 // immediate (zero-duration) first `tick.tick()`. Without this
2673 // yield, the next `advance` would fire the immediate tick
2674 // first and then the periodic tick, but the task still
2675 // wouldn't be polled until the test yields again — we want
2676 // to be inside the loop body before advancing time.
2677 for _ in 0..4 {
2678 tokio::task::yield_now().await;
2679 }
2680
2681 // Drive one heartbeat tick. The heartbeat task issues
2682 // `put_bytes`, which the gate intercepts and parks before the
2683 // inner write completes. The interval is `ttl/3 = 1s` so any
2684 // advance >= 1s is sufficient.
2685 tokio::time::advance(std::time::Duration::from_secs(2)).await;
2686 // Yield repeatedly so the spawned heartbeat task is polled
2687 // and reaches the gate.
2688 for _ in 0..16 {
2689 tokio::task::yield_now().await;
2690 if !log.lock().unwrap().is_empty() {
2691 break;
2692 }
2693 }
2694 assert_eq!(
2695 log.lock().unwrap().as_slice(),
2696 &[Op::PutStart],
2697 "heartbeat PUT must be in flight before release fires",
2698 );
2699
2700 // Begin release on a separate task so we can observe the
2701 // ordering: release blocks on `stop_heartbeat`'s join-await,
2702 // which can only complete after the gated PUT finishes.
2703 let release_store = Arc::clone(&store);
2704 let release_handle = tokio::spawn(async move {
2705 let _ = release_store; // borrow check: keep store alive
2706 release_lock(guard).await
2707 });
2708
2709 // Let the release task get as far as it can — up to the
2710 // join-await on the heartbeat task.
2711 for _ in 0..8 {
2712 tokio::task::yield_now().await;
2713 }
2714 // No DELETE may have fired yet: the heartbeat task is still
2715 // inside the gated `put_bytes`.
2716 assert_eq!(
2717 log.lock().unwrap().as_slice(),
2718 &[Op::PutStart],
2719 "release must NOT issue DELETE while heartbeat PUT is in flight",
2720 );
2721
2722 // Open the gate: the in-flight PUT completes, the heartbeat
2723 // task exits the loop on its next shutdown re-check, and
2724 // release proceeds to DELETE.
2725 put_gate.notify_one();
2726 let result = release_handle.await.expect("release task panicked");
2727 result.expect("release_lock");
2728
2729 let final_log = log.lock().unwrap().clone();
2730 assert_eq!(
2731 final_log,
2732 vec![Op::PutStart, Op::PutEnd, Op::DeleteStart, Op::DeleteEnd],
2733 "operation order must be: heartbeat PUT completes, THEN release DELETE",
2734 );
2735 assert!(
2736 !inner.contains("k"),
2737 "lock key must be deleted after release"
2738 );
2739 }
2740
2741 // --- delete_remote_ref_under_lock ---------------------------------
2742
2743 #[tokio::test]
2744 async fn delete_remote_ref_removes_single_bundle() {
2745 let store = MockStore::new();
2746 let r = rn("refs/heads/main");
2747 store.insert(
2748 format!("repo/refs/heads/main/{SHA}.bundle"),
2749 Bytes::from_static(b"b"),
2750 );
2751 let outcome = delete_remote_ref_under_lock(
2752 &store,
2753 Some("repo"),
2754 &r,
2755 false,
2756 "repo/refs/heads/main/LOCK#.lock",
2757 )
2758 .await
2759 .unwrap();
2760 assert_eq!(
2761 outcome,
2762 PushOutcome::Ok {
2763 remote_ref: "refs/heads/main".into()
2764 }
2765 );
2766 // Prefix-empty oracle (lesson 15): no key survives under the ref
2767 // prefix after a successful delete. Stronger than `!contains(bundle)`
2768 // because a regression that wrote a tombstone or other residue
2769 // (e.g. a re-introduction of c5468b4's bundle-engine tombstone)
2770 // would also trip it.
2771 let remaining: Vec<_> = store
2772 .keys()
2773 .into_iter()
2774 .filter(|k| k.starts_with("repo/refs/heads/main/"))
2775 .collect();
2776 assert!(
2777 remaining.is_empty(),
2778 "ref prefix must be empty after delete: {remaining:?}",
2779 );
2780 // Bundle-engine delete must not write a tombstone — tombstoning
2781 // is a packchain-only deferral (#143, #203).
2782 let gc_keys: Vec<_> = store
2783 .keys()
2784 .into_iter()
2785 .filter(|k| k.starts_with("repo/gc/"))
2786 .collect();
2787 assert!(
2788 gc_keys.is_empty(),
2789 "bundle-engine delete must not write a tombstone: {gc_keys:?}",
2790 );
2791 }
2792
2793 #[tokio::test]
2794 async fn delete_remote_ref_returns_not_found_when_empty() {
2795 let store = MockStore::new();
2796 let r = rn("refs/heads/main");
2797 let outcome = delete_remote_ref_under_lock(
2798 &store,
2799 Some("repo"),
2800 &r,
2801 false,
2802 "repo/refs/heads/main/LOCK#.lock",
2803 )
2804 .await
2805 .unwrap();
2806 match outcome {
2807 PushOutcome::Error { message, .. } => {
2808 assert_eq!(message, r#""not found"?"#);
2809 }
2810 PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2811 }
2812 }
2813
2814 #[tokio::test]
2815 async fn delete_remote_ref_rejects_protected_marker() {
2816 // PROTECTED# is unfiltered for the delete-path count, but we
2817 // detect the marker before the generic multi-bundle error and
2818 // emit a protection-specific refusal that names `unprotect`.
2819 let store = MockStore::new();
2820 let r = rn("refs/heads/main");
2821 let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2822 let protected = "repo/refs/heads/main/PROTECTED#";
2823 store.insert(&bundle, Bytes::from_static(b"b"));
2824 store.insert(protected, Bytes::from_static(b""));
2825 let outcome = delete_remote_ref_under_lock(
2826 &store,
2827 Some("repo"),
2828 &r,
2829 false,
2830 "repo/refs/heads/main/LOCK#.lock",
2831 )
2832 .await
2833 .unwrap();
2834 match outcome {
2835 PushOutcome::Error { message, .. } => {
2836 assert_eq!(
2837 message,
2838 r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2839 );
2840 }
2841 PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2842 }
2843 // Both keys must remain — a regression that deleted on the way
2844 // to the error branch would still satisfy the message check.
2845 assert!(store.contains(&bundle));
2846 assert!(store.contains(protected));
2847 }
2848
2849 #[tokio::test]
2850 async fn delete_remote_ref_reports_corruption_without_protected_marker() {
2851 // Two bundles, no PROTECTED# marker → genuine corruption case
2852 // still falls through to the doctor message.
2853 let store = MockStore::new();
2854 let r = rn("refs/heads/main");
2855 let bundle_a = format!("repo/refs/heads/main/{SHA}.bundle");
2856 let bundle_b = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
2857 store.insert(&bundle_a, Bytes::from_static(b"a"));
2858 store.insert(&bundle_b, Bytes::from_static(b"b"));
2859 let outcome = delete_remote_ref_under_lock(
2860 &store,
2861 Some("repo"),
2862 &r,
2863 false,
2864 "repo/refs/heads/main/LOCK#.lock",
2865 )
2866 .await
2867 .unwrap();
2868 match outcome {
2869 PushOutcome::Error { message, .. } => {
2870 assert_eq!(
2871 message,
2872 r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#,
2873 );
2874 }
2875 PushOutcome::Ok { .. } => panic!("expected Error outcome"),
2876 }
2877 assert!(store.contains(&bundle_a));
2878 assert!(store.contains(&bundle_b));
2879 }
2880
2881 /// Issue #128: the canonical PROTECTED# guard must reject even when
2882 /// the count happens to match `expected`. Pre-#128 this listing
2883 /// `[bundle.bundle, PROTECTED#]` matched `expected = 2` in zip mode
2884 /// and was swept silently. Pin the guard: both keys survive, and
2885 /// the wire error is the protection-specific message — not the
2886 /// generic doctor message.
2887 #[tokio::test]
2888 async fn delete_remote_ref_rejects_protected_marker_when_count_matches_zip() {
2889 let store = MockStore::new();
2890 let r = rn("refs/heads/main");
2891 let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2892 let protected = "repo/refs/heads/main/PROTECTED#";
2893 store.insert(&bundle, Bytes::from_static(b"b"));
2894 store.insert(protected, Bytes::from_static(b""));
2895 // zip = true → expected = 2, and the listing has exactly 2
2896 // entries (bundle + marker). Pre-#128 this fell through to the
2897 // count-match deletion branch.
2898 let outcome = delete_remote_ref_under_lock(
2899 &store,
2900 Some("repo"),
2901 &r,
2902 true,
2903 "repo/refs/heads/main/LOCK#.lock",
2904 )
2905 .await
2906 .unwrap();
2907 match outcome {
2908 PushOutcome::Error { message, .. } => {
2909 assert_eq!(
2910 message,
2911 r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2912 );
2913 }
2914 PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2915 }
2916 assert!(store.contains(&bundle), "bundle must survive");
2917 assert!(store.contains(protected), "marker must survive");
2918 }
2919
2920 /// Issue #128: the protection guard must also fire when the lone
2921 /// remaining key under the ref prefix is the PROTECTED# marker
2922 /// itself. Pre-#128, `entries == [PROTECTED#]` matched `expected = 1`
2923 /// in non-zip mode and the marker was swept — the next push to the
2924 /// ref would then succeed against an unprotected branch even though
2925 /// the operator never ran `unprotect`. Pin the guard: the marker
2926 /// survives and the wire error is the protection-specific message.
2927 #[tokio::test]
2928 async fn delete_remote_ref_rejects_protected_marker_when_only_marker_present() {
2929 let store = MockStore::new();
2930 let r = rn("refs/heads/main");
2931 let protected = "repo/refs/heads/main/PROTECTED#";
2932 store.insert(protected, Bytes::from_static(b""));
2933 let outcome = delete_remote_ref_under_lock(
2934 &store,
2935 Some("repo"),
2936 &r,
2937 false,
2938 "repo/refs/heads/main/LOCK#.lock",
2939 )
2940 .await
2941 .unwrap();
2942 match outcome {
2943 PushOutcome::Error { message, .. } => {
2944 assert_eq!(
2945 message,
2946 r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2947 );
2948 }
2949 PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2950 }
2951 assert!(store.contains(protected), "marker must survive");
2952 }
2953
2954 /// Issue #128: simulate the TOCTOU sequence the bug describes.
2955 /// Client A starts a delete; between `acquire_lock` and the
2956 /// under-lock listing, a marker appears at PROTECTED#. After #159
2957 /// `protect` itself acquires the same lock and cannot race here,
2958 /// but the under-lock listing remains the canonical guard against
2959 /// any other source of a same-key marker (a lock-bypass bug, a
2960 /// non-cooperating client). The fresh under-lock listing reflects
2961 /// the marker, so the canonical guard rejects the delete. Both the
2962 /// bundle and the marker survive.
2963 #[tokio::test]
2964 async fn delete_remote_ref_rejects_protect_landed_between_acquire_and_list() {
2965 let store = MockStore::new();
2966 let r = rn("refs/heads/main");
2967 let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
2968 let lock_key = "repo/refs/heads/main/LOCK#.lock";
2969 let protected = "repo/refs/heads/main/PROTECTED#";
2970 // Client A acquired the lock; bundle was already present.
2971 store.insert(&bundle, Bytes::from_static(b"b"));
2972 store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
2973 // A PROTECTED# marker appears AFTER our lock-acquire but BEFORE
2974 // our under-lock listing — exactly the window the pre-#128
2975 // ordering left open. Post-#159 `protect` cannot reach this
2976 // window itself, but the under-lock listing must still catch
2977 // markers from any other source (lock-bypass bug, non-cooperating
2978 // client).
2979 store.insert(protected, Bytes::from_static(b""));
2980
2981 let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
2982 .await
2983 .unwrap();
2984
2985 match outcome {
2986 PushOutcome::Error { message, .. } => {
2987 assert_eq!(
2988 message,
2989 r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
2990 );
2991 }
2992 PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
2993 }
2994 assert!(store.contains(&bundle), "bundle must survive");
2995 assert!(store.contains(protected), "marker must survive");
2996 assert!(store.contains(lock_key), "held lock must survive");
2997 }
2998
2999 #[tokio::test]
3000 async fn delete_remote_ref_zip_mode_expects_two_keys() {
3001 let store = MockStore::new();
3002 let r = rn("refs/heads/main");
3003 let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
3004 let zip = "repo/refs/heads/main/repo.zip";
3005 store.insert(&bundle, Bytes::from_static(b"b"));
3006 store.insert(zip, Bytes::from_static(b""));
3007 let outcome = delete_remote_ref_under_lock(
3008 &store,
3009 Some("repo"),
3010 &r,
3011 true,
3012 "repo/refs/heads/main/LOCK#.lock",
3013 )
3014 .await
3015 .unwrap();
3016 assert_eq!(
3017 outcome,
3018 PushOutcome::Ok {
3019 remote_ref: "refs/heads/main".into()
3020 }
3021 );
3022 assert!(!store.contains(&bundle));
3023 assert!(!store.contains(zip));
3024 }
3025
3026 // --- PushOutcome rendering ----------------------------------------
3027
3028 #[test]
3029 fn push_outcome_renders_ok_line() {
3030 let line = PushOutcome::Ok {
3031 remote_ref: "refs/heads/main".into(),
3032 }
3033 .to_protocol_line();
3034 assert_eq!(line, "ok refs/heads/main\n");
3035 }
3036
3037 #[test]
3038 fn push_outcome_renders_error_line() {
3039 let line = PushOutcome::Error {
3040 remote_ref: "refs/heads/main".into(),
3041 message: r#""bad"?"#.into(),
3042 }
3043 .to_protocol_line();
3044 assert_eq!(line, "error refs/heads/main \"bad\"?\n");
3045 }
3046
3047 /// Both duplicate-bundle paths (pre-lock at ~line 482 and under-lock
3048 /// at ~line 600) must produce wire output ending in `"?\n`. The `?`
3049 /// suffix is the project-wide Rust convention for `error <ref> "..."`
3050 /// messages — git treats `"..."?` as recoverable and `"..."` as
3051 /// fatal. Both branches normalize to the recoverable form.
3052 #[test]
3053 fn duplicate_bundle_errors_use_consistent_wire_format() {
3054 let pre_lock_line = PushOutcome::Error {
3055 remote_ref: "refs/heads/main".into(),
3056 message:
3057 r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
3058 .to_owned(),
3059 }
3060 .to_protocol_line();
3061 let under_lock_line = PushOutcome::Error {
3062 remote_ref: "refs/heads/main".into(),
3063 message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
3064 }
3065 .to_protocol_line();
3066
3067 assert_eq!(
3068 pre_lock_line,
3069 "error refs/heads/main \"multiple bundles exist on server. \
3070 Run git-remote-object-store doctor to fix.\"?\n",
3071 );
3072 assert_eq!(
3073 under_lock_line,
3074 "error refs/heads/main \"multiple bundles exist for the same ref on server. \
3075 Run git-remote-object-store doctor to fix.\"?\n",
3076 );
3077 assert!(pre_lock_line.ends_with("\"?\n"));
3078 assert!(under_lock_line.ends_with("\"?\n"));
3079 }
3080
3081 // --- lock_ttl_from_env --------------------------------------------
3082
3083 #[test]
3084 fn lock_ttl_env_override_falls_back_for_unset_invalid_or_zero() {
3085 // Group all env-var cases in one test fn so they share a single
3086 // `EnvGuard` and its per-key lock — the var is process-global,
3087 // and the guard serialises against `manage::doctor`'s
3088 // env-touching test that pokes the same key. Drop restores the
3089 // prior value on every exit path, including assertion panics.
3090 let env = crate::test_util::EnvGuard::take(ENV_LOCK_TTL_SECONDS);
3091 let default_ttl = Duration::seconds(i64::try_from(DEFAULT_LOCK_TTL_SECONDS).unwrap());
3092 // Unset returns default.
3093 env.clear();
3094 assert_eq!(lock_ttl_from_env(), default_ttl);
3095 // `None` and `Some(0)` (issue #208) defer to env-or-default.
3096 assert_eq!(
3097 resolve_lock_ttl_seconds(None),
3098 DEFAULT_LOCK_TTL_SECONDS,
3099 "None must defer to env-or-default",
3100 );
3101 assert_eq!(
3102 resolve_lock_ttl_seconds(Some(0)),
3103 DEFAULT_LOCK_TTL_SECONDS,
3104 "Some(0) must not defeat per-ref locking (issue #208)",
3105 );
3106 // Non-numeric falls back.
3107 env.set_to("not-a-number");
3108 assert_eq!(lock_ttl_from_env(), default_ttl);
3109 // Zero falls back (would defeat per-ref locking).
3110 env.set_to("0");
3111 assert_eq!(lock_ttl_from_env(), default_ttl);
3112 // Positive integer wins.
3113 env.set_to("120");
3114 assert_eq!(lock_ttl_from_env(), Duration::seconds(120));
3115 // With env set, `None` and `Some(0)` honour the env override —
3116 // an operator's env var must still take effect when a CLI
3117 // consumer accidentally passes the wrong default.
3118 assert_eq!(
3119 resolve_lock_ttl_seconds(None),
3120 120,
3121 "None must honour env override",
3122 );
3123 assert_eq!(
3124 resolve_lock_ttl_seconds(Some(0)),
3125 120,
3126 "Some(0) must honour env override",
3127 );
3128 }
3129
3130 // --- saturating_duration_seconds (issue #221) ---------------------
3131
3132 #[test]
3133 fn saturating_duration_seconds_caps_at_i64_max() {
3134 // `u64::MAX` exceeds `i64::MAX` — the helper must saturate at
3135 // `i64::MAX` rather than panic on the `try_from`. This is the
3136 // ~292-billion-year sentinel ceiling shared by all TTL paths.
3137 assert_eq!(
3138 saturating_duration_seconds(u64::MAX),
3139 Duration::seconds(i64::MAX),
3140 );
3141 }
3142
3143 #[test]
3144 fn saturating_duration_seconds_passes_normal_value() {
3145 assert_eq!(saturating_duration_seconds(60), Duration::seconds(60));
3146 }
3147
3148 // --- resolve_lock_ttl_seconds (issue #208) ------------------------
3149 //
3150 // `Compact::run_into` used to accept `Some(0)` and feed it straight
3151 // into the engine, bypassing the `lock_ttl_from_env` zero-clamp
3152 // from #112 and defeating per-ref locking. The shared resolver
3153 // collapses both `None` and `Some(0)` onto the env-or-default path
3154 // so the lock-acquiring call site cannot re-introduce the footgun.
3155 // `Doctor::resolved_lock_ttl_seconds` deliberately does NOT route
3156 // through this resolver — doctor only compares lock ages and never
3157 // acquires a lock, so an operator-explicit `Some(0)` is a valid
3158 // "treat every lock as stale" request and is honoured.
3159
3160 #[test]
3161 fn resolve_lock_ttl_some_positive_returns_unchanged() {
3162 // Positive values bypass the env entirely and so are safe to
3163 // assert in parallel with the env-touching test below. Cover
3164 // the smallest valid value, a normal value, and the u64 ceiling
3165 // — the ceiling documents the deliberate decision to not impose
3166 // an upper bound: downstream `time::Duration::seconds` saturates
3167 // safely at `i64::MAX`.
3168 assert_eq!(resolve_lock_ttl_seconds(Some(1)), 1);
3169 assert_eq!(resolve_lock_ttl_seconds(Some(120)), 120);
3170 assert_eq!(resolve_lock_ttl_seconds(Some(u64::MAX)), u64::MAX);
3171 }
3172
3173 // --- FORMAT key write via perform_push_under_lock --------------------
3174
3175 /// Helper: run `perform_push_under_lock` against a temporary bundle file
3176 /// so we can assert on the resulting store state.
3177 async fn push_under_lock_with_bundle(
3178 store: &MockStore,
3179 prefix: Option<&str>,
3180 engine: StorageEngine,
3181 ) -> PushOutcome {
3182 let r = rn("refs/heads/main");
3183 let temp_dir = tempfile::Builder::new()
3184 .prefix("test_push_")
3185 .tempdir()
3186 .unwrap();
3187 let bundle_path = temp_dir.path().join("bundle");
3188 std::fs::write(&bundle_path, b"fake bundle").unwrap();
3189
3190 let state = PushReadyState {
3191 remote_ref: r,
3192 local_sha: Sha::from_hex(SHA).unwrap(),
3193 pre_existing: None,
3194 bundle_path,
3195 zip_artifacts: None,
3196 engine,
3197 force: false,
3198 pre_existing_was_ancestor: true,
3199 local_spec: "refs/heads/main".to_owned(),
3200 hidden_bundles: HashSet::new(),
3201 _temp_dir: temp_dir,
3202 };
3203
3204 perform_push_under_lock(store, prefix, BackendKind::S3, state)
3205 .await
3206 .unwrap()
3207 }
3208
3209 #[tokio::test]
3210 async fn perform_push_under_lock_writes_format_key_on_first_push() {
3211 let store = MockStore::new();
3212 let outcome =
3213 push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
3214 assert!(
3215 matches!(outcome, PushOutcome::Ok { .. }),
3216 "expected Ok outcome"
3217 );
3218 assert!(
3219 store.contains("repo/FORMAT"),
3220 "FORMAT key must be written on the first push",
3221 );
3222 let content = store.get_bytes("repo/FORMAT").await.unwrap();
3223 assert_eq!(content.as_ref(), b"bundle");
3224 }
3225
3226 #[tokio::test]
3227 async fn perform_push_under_lock_writes_format_key_without_prefix() {
3228 let store = MockStore::new();
3229 let outcome = push_under_lock_with_bundle(&store, None, StorageEngine::Bundle).await;
3230 assert!(
3231 matches!(outcome, PushOutcome::Ok { .. }),
3232 "expected Ok outcome"
3233 );
3234 assert!(
3235 store.contains("FORMAT"),
3236 "FORMAT key must be written at root when no prefix",
3237 );
3238 let content = store.get_bytes("FORMAT").await.unwrap();
3239 assert_eq!(content.as_ref(), b"bundle");
3240 }
3241
3242 #[tokio::test]
3243 async fn perform_push_under_lock_format_key_is_idempotent() {
3244 // If FORMAT already exists (second push), put_if_absent is a no-op.
3245 // Pre-insert with a trailing newline so the original bytes differ from
3246 // what the push would write — a plain `put` would overwrite to
3247 // b"bundle", while put_if_absent must preserve b"bundle\n".
3248 let store = MockStore::new();
3249 store.insert("repo/FORMAT", Bytes::from_static(b"bundle\n"));
3250 let outcome =
3251 push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
3252 assert!(
3253 matches!(outcome, PushOutcome::Ok { .. }),
3254 "expected Ok outcome"
3255 );
3256 // Original content preserved — put_if_absent did not overwrite.
3257 let content = store.get_bytes("repo/FORMAT").await.unwrap();
3258 assert_eq!(content.as_ref(), b"bundle\n");
3259 }
3260
3261 // --- stale-remote guard -------------------------------------------
3262
3263 /// Build a `PushReadyState` with a specific `pre_existing` key and a
3264 /// matching `bundle_path` on disk. The store is left in whatever state
3265 /// the caller configured (e.g. with a pre-seeded bundle key); the
3266 /// caller already knows which prefix it seeded under, so this helper
3267 /// does not need to take it.
3268 fn push_state_with_pre_existing(pre_existing: Option<String>) -> PushReadyState {
3269 let r = rn("refs/heads/main");
3270 let temp_dir = tempfile::Builder::new()
3271 .prefix("test_push_")
3272 .tempdir()
3273 .unwrap();
3274 let bundle_path = temp_dir.path().join("bundle");
3275 std::fs::write(&bundle_path, b"fake bundle").unwrap();
3276 PushReadyState {
3277 remote_ref: r,
3278 local_sha: Sha::from_hex(SHA).unwrap(),
3279 pre_existing,
3280 bundle_path,
3281 zip_artifacts: None,
3282 engine: StorageEngine::Bundle,
3283 force: false,
3284 pre_existing_was_ancestor: true,
3285 local_spec: "refs/heads/main".to_owned(),
3286 hidden_bundles: HashSet::new(),
3287 _temp_dir: temp_dir,
3288 }
3289 }
3290
3291 /// `pre_existing=None`, `current_key=Some(...)`: a concurrent push
3292 /// created a bundle after our pre-lock list but before we acquired
3293 /// the lock.
3294 ///
3295 /// The seeded `existing_key` deliberately uses a SHA distinct from
3296 /// the local push's `local_sha` (= `SHA`). If they matched, a
3297 /// regression that compared `current_key` against `bundle_dest`
3298 /// (the key derived from `local_sha`) instead of against
3299 /// `pre_existing` could pass for the wrong reason — the keys
3300 /// happen to align. With distinct SHAs the only way the test
3301 /// passes is the correct comparison: `pre_existing(None) !=
3302 /// current_key(Some)`.
3303 #[tokio::test]
3304 async fn perform_push_under_lock_rejects_none_to_some_stale_remote() {
3305 let store = MockStore::new();
3306 let existing_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3307 store.insert(&existing_key, Bytes::from_static(b"old bundle"));
3308 let state = push_state_with_pre_existing(None);
3309 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3310 .await
3311 .unwrap();
3312 assert!(
3313 matches!(
3314 &outcome,
3315 PushOutcome::Error { message, .. }
3316 if message == r#""stale remote. Please fetch and retry."?"#
3317 ),
3318 "expected stale-remote error, got {outcome:?}",
3319 );
3320 }
3321
3322 /// `pre_existing=Some(key)`, `current_key=None`: the bundle was
3323 /// deleted between our pre-lock list and the lock acquisition (e.g.
3324 /// a concurrent `git push :<ref>` delete).
3325 #[tokio::test]
3326 async fn perform_push_under_lock_rejects_some_to_none_stale_remote() {
3327 let store = MockStore::new();
3328 let old_key = format!("repo/refs/heads/main/{SHA}.bundle");
3329 let state = push_state_with_pre_existing(Some(old_key.clone()));
3330 // Store is empty — the previously-seen bundle is gone.
3331 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3332 .await
3333 .unwrap();
3334 assert!(
3335 matches!(
3336 &outcome,
3337 PushOutcome::Error { message, .. }
3338 if message == r#""stale remote. Please fetch and retry."?"#
3339 ),
3340 "expected stale-remote error, got {outcome:?}",
3341 );
3342 }
3343
3344 /// `pre_existing=Some(key_a)`, `current_key=Some(key_b)` where
3345 /// `key_a != key_b`: a concurrent push replaced our bundle.
3346 #[tokio::test]
3347 async fn perform_push_under_lock_rejects_replaced_bundle_stale_remote() {
3348 let store = MockStore::new();
3349 let old_sha = SHA;
3350 let new_sha = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
3351 let old_key = format!("repo/refs/heads/main/{old_sha}.bundle");
3352 let new_key = format!("repo/refs/heads/main/{new_sha}.bundle");
3353 // Under-lock the store shows the *new* key.
3354 store.insert(&new_key, Bytes::from_static(b"new bundle"));
3355 let state = push_state_with_pre_existing(Some(old_key));
3356 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3357 .await
3358 .unwrap();
3359 assert!(
3360 matches!(
3361 &outcome,
3362 PushOutcome::Error { message, .. }
3363 if message == r#""stale remote. Please fetch and retry."?"#
3364 ),
3365 "expected stale-remote error, got {outcome:?}",
3366 );
3367 }
3368
3369 /// Under-lock re-listing sees two bundles for the same ref (two clients
3370 /// raced and both uploaded before either acquired the lock). Must return
3371 /// the under-lock duplicate-bundle error before reaching the stale-remote
3372 /// guard or the upload.
3373 #[tokio::test]
3374 async fn perform_push_under_lock_rejects_two_bundles_seen_under_lock() {
3375 let store = MockStore::new();
3376 let sha_a = "1111111111111111111111111111111111111111";
3377 let sha_b = "2222222222222222222222222222222222222222";
3378 store.insert(
3379 format!("repo/refs/heads/main/{sha_a}.bundle"),
3380 Bytes::from_static(b"bundle_a"),
3381 );
3382 store.insert(
3383 format!("repo/refs/heads/main/{sha_b}.bundle"),
3384 Bytes::from_static(b"bundle_b"),
3385 );
3386 // Pre-lock snapshot saw zero bundles; under-lock sees two.
3387 let state = push_state_with_pre_existing(None);
3388 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3389 .await
3390 .unwrap();
3391 assert!(
3392 matches!(
3393 &outcome,
3394 PushOutcome::Error { message, .. }
3395 if message == r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#
3396 ),
3397 "expected under-lock multi-bundle error, got {outcome:?}",
3398 );
3399 // Neither bundle was overwritten or deleted.
3400 assert!(store.contains(&format!("repo/refs/heads/main/{sha_a}.bundle")));
3401 assert!(store.contains(&format!("repo/refs/heads/main/{sha_b}.bundle")));
3402 }
3403
3404 /// Stale-remote happy path: `pre_existing == current_key`. The four
3405 /// drift scenarios above all assert that mismatch produces an error.
3406 /// This case asserts that the *match* path goes through to a normal
3407 /// `Ok(remote_ref)` push outcome — without it, a regression that
3408 /// flipped the comparison to always-mismatch would pass every drift
3409 /// test and silently break every real push.
3410 ///
3411 /// `pre_existing` is seeded under SHA distinct from `local_sha`
3412 /// (mirroring the `rejects_none_to_some` hardening): so a buggy
3413 /// regression that compared `current_key` against `bundle_dest`
3414 /// (the key derived from `local_sha`) instead of against
3415 /// `pre_existing` cannot pass for the wrong reason — the keys
3416 /// are deliberately different. The push must:
3417 /// 1. produce `Ok(remote_ref)`
3418 /// 2. upload the new bundle at `bundle_dest` (SHA = `local_sha`)
3419 /// 3. write a baseline tombstone naming `OTHER_SHA` (issue #157
3420 /// defers the prior-bundle delete to `gc sweep`) and leave
3421 /// the prior bundle in place until grace expires
3422 /// 4. write `HEAD` and `FORMAT`
3423 #[tokio::test]
3424 async fn perform_push_under_lock_passes_through_when_pre_existing_matches_current() {
3425 let store = MockStore::new();
3426 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3427 // Seed under `OTHER_SHA` so under-lock list returns this key.
3428 // The local push's `local_sha` is `SHA`, so `bundle_dest` is a
3429 // DIFFERENT key. The stale-remote check passes (pre_existing ==
3430 // current_key, both = pre_key); the function then uploads the
3431 // new bundle at `bundle_dest` and writes a baseline tombstone
3432 // naming the old bundle's SHA.
3433 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3434 let state = push_state_with_pre_existing(Some(pre_key.clone()));
3435 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3436 .await
3437 .unwrap();
3438 assert!(
3439 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3440 "expected Ok(refs/heads/main), got {outcome:?}",
3441 );
3442 // The new bundle must land at the bundle_dest derived from
3443 // local_sha, not at pre_key.
3444 let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3445 let new_bytes = store
3446 .get_bytes(&bundle_dest)
3447 .await
3448 .expect("new bundle must be uploaded at bundle_dest");
3449 assert_eq!(
3450 new_bytes.as_ref(),
3451 b"fake bundle",
3452 "new bundle must contain the local payload",
3453 );
3454 // Issue #157: the old bundle must remain readable — fetchers
3455 // that advertised it via an earlier `list` need the grace
3456 // window. `gc sweep` will reclaim it later.
3457 assert!(
3458 store.contains(&pre_key),
3459 "old bundle at pre_key must survive the push (deferred via tombstone)",
3460 );
3461 // A single baseline tombstone naming the prior SHA must exist.
3462 let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3463 let metas = store.list("repo/gc/").await.unwrap();
3464 let tombstones: Vec<_> = metas
3465 .iter()
3466 .filter(|m| m.key.starts_with(&tomb_listing))
3467 .collect();
3468 assert_eq!(
3469 tombstones.len(),
3470 1,
3471 "exactly one baseline tombstone must be written; got keys: {:?}",
3472 tombstones.iter().map(|m| &m.key).collect::<Vec<_>>(),
3473 );
3474 let body = store.get_bytes(&tombstones[0].key).await.unwrap();
3475 let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
3476 assert_eq!(
3477 parsed["ref_name"].as_str(),
3478 Some("refs/heads/main"),
3479 "tombstone must name the pushed ref",
3480 );
3481 assert_eq!(
3482 parsed["sha"].as_str(),
3483 Some(OTHER_SHA),
3484 "tombstone must name the prior SHA so `gc sweep` reclaims OTHER_SHA.bundle",
3485 );
3486 assert!(
3487 store.contains("repo/FORMAT"),
3488 "FORMAT key must be written by the push",
3489 );
3490 assert!(
3491 store.contains("repo/HEAD"),
3492 "HEAD key must be written by the push",
3493 );
3494 }
3495
3496 /// Issue #121 + #157 regression: the prior-bundle cleanup must
3497 /// never fail the push. The new bundle is already durable;
3498 /// reporting failure misrepresents the remote state. Match the
3499 /// `compact` / `force_push_baseline_cleanup` best-effort contract:
3500 /// log at warn and report success.
3501 ///
3502 /// Under issue #157 the preferred cleanup path is a baseline
3503 /// tombstone (deferred reclamation). When the tombstone PUT
3504 /// itself fails, the fallback synchronous delete runs — so this
3505 /// test arms a fault on BOTH the tombstone PUT and the prior
3506 /// bundle's delete to exercise the worst-case orphan path. The
3507 /// two-bundle state remains on the bucket so the next push's
3508 /// under-lock multi-bundle guard surfaces it to the operator.
3509 #[tokio::test]
3510 async fn perform_push_under_lock_succeeds_when_prior_cleanup_fails() {
3511 use crate::object_store::mock::Fault;
3512 let store = MockStore::new();
3513 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3514 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3515 // Fail the tombstone PUT (any key under the baseline-tomb
3516 // namespace) AND the fallback synchronous delete of the
3517 // prior bundle. The push must still report Ok.
3518 store.arm(Fault::NetworkOnPutBytesPrefix {
3519 prefix: baseline_tombstone_listing_prefix(Some("repo")),
3520 });
3521 store.arm(Fault::NetworkOnDelete {
3522 key: pre_key.clone(),
3523 });
3524
3525 let state = push_state_with_pre_existing(Some(pre_key.clone()));
3526 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3527 .await
3528 .expect("push must succeed even when prior-bundle cleanup fails");
3529 assert!(
3530 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3531 "expected Ok(refs/heads/main), got {outcome:?}",
3532 );
3533
3534 let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3535 assert!(
3536 store.contains(&bundle_dest),
3537 "new bundle must be uploaded at bundle_dest",
3538 );
3539 // Both faults fired: tombstone PUT first, fallback delete
3540 // second. None should remain pending.
3541 assert_eq!(store.pending_faults(), 0);
3542 // Orphan remains on the bucket — operator sees the warn log and
3543 // the next push's multi-bundle guard will direct them to doctor.
3544 assert!(
3545 store.contains(&pre_key),
3546 "cleanup faults must leave the prior bundle in place",
3547 );
3548 // No tombstone was written (the PUT failed); the operator path
3549 // is the multi-bundle guard, not deferred reclamation.
3550 let metas = store.list("repo/gc/").await.unwrap();
3551 let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3552 assert!(
3553 !metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
3554 "no baseline tombstone must remain after a failed PUT",
3555 );
3556 }
3557
3558 /// Issue #157 fallback path: when the baseline tombstone PUT fails
3559 /// but the synchronous delete succeeds, the prior bundle is
3560 /// reclaimed immediately. This is the recovery shape that
3561 /// preserves the issue #121 "two-bundle state surfaces via doctor"
3562 /// invariant — without the fallback delete, a tombstone PUT
3563 /// failure would orphan the prior bundle indefinitely (gc sweep
3564 /// has no tombstone to act on).
3565 #[tokio::test]
3566 async fn perform_push_under_lock_falls_back_to_sync_delete_on_tombstone_put_failure() {
3567 use crate::object_store::mock::Fault;
3568 let store = MockStore::new();
3569 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3570 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
3571 // Fail only the tombstone PUT — the prior-bundle delete is
3572 // left armable-free so the fallback path completes.
3573 store.arm(Fault::NetworkOnPutBytesPrefix {
3574 prefix: baseline_tombstone_listing_prefix(Some("repo")),
3575 });
3576
3577 let state = push_state_with_pre_existing(Some(pre_key.clone()));
3578 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3579 .await
3580 .expect("push must succeed when tombstone PUT fails but fallback delete succeeds");
3581 assert!(
3582 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3583 "expected Ok(refs/heads/main), got {outcome:?}",
3584 );
3585
3586 // The fault on the tombstone PUT fired; no tombstone remains.
3587 assert_eq!(store.pending_faults(), 0);
3588 let metas = store.list("repo/gc/").await.unwrap();
3589 let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
3590 assert!(
3591 !metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
3592 "tombstone PUT failed, so no baseline-tomb key may exist",
3593 );
3594 // The fallback synchronous delete succeeded — prior bundle gone.
3595 assert!(
3596 !store.contains(&pre_key),
3597 "fallback synchronous delete must reclaim the prior bundle",
3598 );
3599 // The new bundle is durable.
3600 let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3601 assert!(
3602 store.contains(&bundle_dest),
3603 "new bundle must be uploaded at bundle_dest",
3604 );
3605 }
3606
3607 /// Issue #127 regression: a non-`NotFound` error on the optional
3608 /// zip artifact upload after the new bundle has been put must NOT
3609 /// fail the push. The bundle, `HEAD`, and `FORMAT` are already
3610 /// durable; reporting failure misrepresents the remote state. The
3611 /// zip is a CodePipeline-side convenience surface — bundle
3612 /// availability is what determines whether `git clone`/`fetch`
3613 /// work. Match the `delete_prior_bundle_best_effort` contract: log
3614 /// at warn and report success.
3615 #[tokio::test]
3616 async fn perform_push_under_lock_succeeds_when_zip_upload_fails() {
3617 use crate::object_store::mock::Fault;
3618 let store = MockStore::new();
3619
3620 // Build a PushReadyState with zip_artifacts present so the
3621 // zip-upload block runs. The archive file is written to disk
3622 // so a future refactor that re-orders the fault check vs. the
3623 // file read still has a real file to act on.
3624 let r = rn("refs/heads/main");
3625 let temp_dir = tempfile::Builder::new()
3626 .prefix("test_push_")
3627 .tempdir()
3628 .unwrap();
3629 let bundle_path = temp_dir.path().join("bundle");
3630 std::fs::write(&bundle_path, b"fake bundle").unwrap();
3631
3632 let archive_tempdir = tempfile::Builder::new()
3633 .prefix("test_zip_")
3634 .tempdir()
3635 .unwrap();
3636 let archive_path = archive_tempdir.path().join("repo.zip");
3637 std::fs::write(&archive_path, b"fake zip body").unwrap();
3638
3639 let state = PushReadyState {
3640 remote_ref: r,
3641 local_sha: Sha::from_hex(SHA).unwrap(),
3642 pre_existing: None,
3643 bundle_path,
3644 zip_artifacts: Some(ZipArtifacts {
3645 archive_path,
3646 short_sha: "deadbeef".to_owned(),
3647 commit_msg: "test commit".to_owned(),
3648 _tempdir: archive_tempdir,
3649 }),
3650 engine: StorageEngine::Bundle,
3651 force: false,
3652 pre_existing_was_ancestor: true,
3653 local_spec: "refs/heads/main".to_owned(),
3654 hidden_bundles: HashSet::new(),
3655 _temp_dir: temp_dir,
3656 };
3657
3658 let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
3659 store.arm(Fault::NetworkOnPutPath {
3660 key: zip_dest.clone(),
3661 });
3662
3663 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
3664 .await
3665 .expect("push must succeed even when zip upload fails");
3666 assert!(
3667 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
3668 "expected Ok(refs/heads/main), got {outcome:?}",
3669 );
3670
3671 // The bundle reached the bucket — that is the git-protocol
3672 // contract for a successful push.
3673 let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
3674 assert!(
3675 store.contains(&bundle_dest),
3676 "new bundle must be uploaded at bundle_dest",
3677 );
3678 // The zip fault fired exactly once — proves put_path was
3679 // attempted and failed.
3680 assert_eq!(store.pending_faults(), 0);
3681 // The zip key is absent — proves the failure was not silently
3682 // swallowed by a retry that masked the regression.
3683 assert!(
3684 !store.contains(&zip_dest),
3685 "zip key must be absent when the upload fault fires",
3686 );
3687 }
3688
3689 /// Issue #161: on S3 the zip-artifact upload must carry the
3690 /// `codepipeline-artifact-revision-summary` user-metadata header so
3691 /// AWS `CodePipeline` can consume the commit summary. On Azure the
3692 /// same hyphenated key is rejected by the service (metadata names
3693 /// must be valid C# identifiers), and the issue #127 swallow path
3694 /// then hides the upload failure — silently dropping every zip
3695 /// artifact. The fix only emits the metadata on S3; this pair of
3696 /// tests pins both halves of the contract against `MockStore`,
3697 /// which records `user_metadata` verbatim and lets us inspect it
3698 /// without standing up a live backend.
3699 #[tokio::test]
3700 async fn perform_push_under_lock_emits_codepipeline_metadata_on_s3() {
3701 let (store, zip_dest) = run_zip_push(BackendKind::S3).await;
3702 let meta = store.metadata(&zip_dest).expect("zip stored");
3703 let summary = meta
3704 .user_metadata
3705 .iter()
3706 .find(|(k, _)| k == "codepipeline-artifact-revision-summary")
3707 .expect("S3 push must attach the CodePipeline revision-summary metadata");
3708 assert_eq!(summary.1, "test commit");
3709 }
3710
3711 #[tokio::test]
3712 async fn perform_push_under_lock_omits_codepipeline_metadata_on_azure() {
3713 let (store, zip_dest) = run_zip_push(BackendKind::Azure).await;
3714 let meta = store.metadata(&zip_dest).expect("zip stored");
3715 assert!(
3716 meta.user_metadata.is_empty(),
3717 "Azure push must not attach hyphenated CodePipeline metadata; \
3718 got {entries:?}",
3719 entries = meta.user_metadata,
3720 );
3721 }
3722
3723 /// Drive a single `?zip=1` push through `perform_push_under_lock` for
3724 /// the given backend kind, returning the store and the zip key so
3725 /// each test can assert on `user_metadata` independently. Centralised
3726 /// here so an accidental drift between the S3 and Azure variants
3727 /// (different `commit_msg`, different prefix, different ref) cannot
3728 /// hide a regression in the metadata wiring.
3729 async fn run_zip_push(kind: BackendKind) -> (MockStore, String) {
3730 let store = MockStore::new();
3731 let r = rn("refs/heads/main");
3732 let temp_dir = tempfile::Builder::new()
3733 .prefix("test_push_")
3734 .tempdir()
3735 .unwrap();
3736 let bundle_path = temp_dir.path().join("bundle");
3737 std::fs::write(&bundle_path, b"fake bundle").unwrap();
3738
3739 let archive_tempdir = tempfile::Builder::new()
3740 .prefix("test_zip_")
3741 .tempdir()
3742 .unwrap();
3743 let archive_path = archive_tempdir.path().join("repo.zip");
3744 std::fs::write(&archive_path, b"fake zip body").unwrap();
3745
3746 let state = PushReadyState {
3747 remote_ref: r,
3748 local_sha: Sha::from_hex(SHA).unwrap(),
3749 pre_existing: None,
3750 bundle_path,
3751 zip_artifacts: Some(ZipArtifacts {
3752 archive_path,
3753 short_sha: "deadbeef".to_owned(),
3754 commit_msg: "test commit".to_owned(),
3755 _tempdir: archive_tempdir,
3756 }),
3757 engine: StorageEngine::Bundle,
3758 force: false,
3759 pre_existing_was_ancestor: true,
3760 local_spec: "refs/heads/main".to_owned(),
3761 hidden_bundles: HashSet::new(),
3762 _temp_dir: temp_dir,
3763 };
3764
3765 let outcome = perform_push_under_lock(&store, Some("repo"), kind, state)
3766 .await
3767 .expect("push must succeed");
3768 assert!(matches!(outcome, PushOutcome::Ok { .. }));
3769 let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
3770 assert!(
3771 store.contains(&zip_dest),
3772 "zip artifact must land on bucket"
3773 );
3774 (store, zip_dest)
3775 }
3776
3777 // --- delete_remote_ref_under_lock ---------------------------------
3778
3779 /// Issue #133: under the lock, a ref whose only remaining object is
3780 /// the lock key itself reports `"not found"?` rather than the
3781 /// pre-#133 quirk of treating the lock as a bundle. In production
3782 /// the lock here is the one [`acquire_lock`] holds across the call;
3783 /// `release_lock` deletes it after this function returns, so the
3784 /// caller never sees a dangling lock either way.
3785 ///
3786 /// This pins the new contract: the filter on the lock key must
3787 /// short-circuit to `"not found"?` when the lock is the only
3788 /// listed entry, NOT delete it as a bundle.
3789 #[tokio::test]
3790 async fn delete_remote_ref_under_lock_reports_not_found_when_only_lock_present() {
3791 let store = MockStore::new();
3792 let lock_key = "repo/refs/heads/main/LOCK#.lock";
3793 // Simulate the lock we hold across the call (the production
3794 // caller has already acquired it via `acquire_lock`).
3795 store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
3796 let r = rn("refs/heads/main");
3797
3798 let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
3799 .await
3800 .unwrap();
3801
3802 match outcome {
3803 PushOutcome::Error { message, .. } => {
3804 assert_eq!(message, r#""not found"?"#);
3805 }
3806 PushOutcome::Ok { .. } => panic!("expected Error, got Ok"),
3807 }
3808 // The lock key is NOT swept by `delete_remote_ref_under_lock` —
3809 // `release_lock` is responsible for removing it. Pin that here
3810 // so a regression that swept the held lock would fail.
3811 assert!(
3812 store.contains(lock_key),
3813 "delete_remote_ref_under_lock must NOT delete the held lock key",
3814 );
3815 }
3816
3817 /// Issue #133: when a concurrent push lands a NEW bundle between
3818 /// the lock-acquire and our listing inside the lock, the post-lock
3819 /// listing reflects the new bundle. The delete proceeds normally
3820 /// against that bundle (and the lock is filtered out). This pins
3821 /// the close of the race window the issue describes: the listing
3822 /// is now under the lock, so the deletion target is whatever the
3823 /// concurrent writer left behind, not a stale pre-lock snapshot.
3824 #[tokio::test]
3825 async fn delete_remote_ref_under_lock_sweeps_concurrently_landed_bundle() {
3826 let store = MockStore::new();
3827 let r = rn("refs/heads/main");
3828 let lock_key = "repo/refs/heads/main/LOCK#.lock";
3829 // Concurrent push landed this bundle; we hold the lock now.
3830 let bundle = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
3831 store.insert(&bundle, Bytes::from_static(b"new"));
3832 store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
3833
3834 let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
3835 .await
3836 .unwrap();
3837
3838 assert_eq!(
3839 outcome,
3840 PushOutcome::Ok {
3841 remote_ref: "refs/heads/main".into()
3842 }
3843 );
3844 assert!(
3845 !store.contains(&bundle),
3846 "concurrently-landed bundle must be swept by the under-lock listing",
3847 );
3848 assert!(
3849 store.contains(lock_key),
3850 "held lock must survive the sweep (release_lock removes it)",
3851 );
3852 }
3853
3854 /// Stale lock is deleted but another client re-acquires it before our
3855 /// retry `put_if_absent`. Must return `Ok(None)` — the caller maps
3856 /// this to a "lock held" user error, not a hard failure.
3857 #[tokio::test]
3858 async fn acquire_lock_stale_retry_loses_second_race() {
3859 use crate::object_store::mock::Fault;
3860 let store = MockStore::new();
3861 let now = OffsetDateTime::now_utc();
3862 let stale = now - Duration::seconds(120);
3863 store.insert_with("k", Bytes::new(), stale, PutOpts::default());
3864 // Another client wins the race between our delete and retry.
3865 store.arm(Fault::ContendedPutIfAbsent { key: "k".into() });
3866 let arc = Arc::new(store);
3867 let guard = acquire_lock(
3868 Arc::clone(&arc) as Arc<dyn ObjectStore>,
3869 "k",
3870 Duration::seconds(60),
3871 now,
3872 )
3873 .await
3874 .unwrap();
3875 assert!(guard.is_none(), "expected contention on the retry race");
3876 // Fault fired — confirms the retry put_if_absent was called.
3877 assert_eq!(arc.pending_faults(), 0);
3878 // The stale lock was removed; no key remains.
3879 assert!(!arc.contains("k"));
3880 }
3881
3882 // --- full_error_chain dedup --------------------------------------
3883
3884 /// Tripwire for the dedup-by-suffix fix: a naive chain-walk on
3885 /// `PushError::Store(ObjectStoreError::Network(_))` produces a
3886 /// duplicated tail because both `PushError::Store` and
3887 /// `ObjectStoreError::Network` inline their immediate source via
3888 /// `{0}` in the `Display` derive. The shared
3889 /// `super::append_source_chain` helper skips levels whose text is
3890 /// already at the tail of the message. A regression that
3891 /// re-introduced the always-append walk would render
3892 /// `"…network error: dns failure: dns failure"` (or even longer
3893 /// for deeper chains), failing this byte-exact assertion.
3894 #[test]
3895 fn full_error_chain_deduplicates_inlined_source_text() {
3896 let inner: crate::object_store::BoxError = Box::new(std::io::Error::other("dns failure"));
3897 let err = PushError::Store(ObjectStoreError::Network(inner));
3898 let rendered = full_error_chain(&err);
3899 assert_eq!(
3900 rendered, "object-store error during push: network error: dns failure",
3901 "PushError::Store(Network(_)) must not duplicate the inner source",
3902 );
3903 }
3904
3905 // --- bundle progress sink wiring (issue #55) -----------------------
3906
3907 /// Decorator around `MockStore` that records, for every `put_path`
3908 /// call, whether `opts.progress` was `Some`. Used to pin the
3909 /// "bundle uploads attach a progress sink" contract from issue
3910 /// #55: a regression that drops the sink from `perform_push_under_lock`
3911 /// would silently regress the only thing `git push` users have to
3912 /// watch a multi-GiB transfer.
3913 ///
3914 /// The decorator forwards every other method to the inner
3915 /// `MockStore` unchanged. Wrapping for the assertion-of-interest
3916 /// alone keeps the test tightly scoped — there is no fault
3917 /// injection, no chunking knob, just a Vec of "did `put_path` get
3918 /// a sink?" booleans keyed by the destination key.
3919 #[derive(Default)]
3920 struct RecordingPutPathStore {
3921 inner: MockStore,
3922 put_path_progress_seen: std::sync::Mutex<Vec<(String, bool)>>,
3923 }
3924
3925 impl RecordingPutPathStore {
3926 fn observed(&self) -> Vec<(String, bool)> {
3927 self.put_path_progress_seen
3928 .lock()
3929 .expect("observation lock")
3930 .clone()
3931 }
3932 }
3933
3934 #[async_trait::async_trait]
3935 impl ObjectStore for RecordingPutPathStore {
3936 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
3937 self.inner.list(prefix).await
3938 }
3939 async fn get_to_file(
3940 &self,
3941 key: &str,
3942 dest: &Path,
3943 opts: crate::object_store::GetOpts,
3944 ) -> Result<(), ObjectStoreError> {
3945 self.inner.get_to_file(key, dest, opts).await
3946 }
3947 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
3948 self.inner.get_bytes(key).await
3949 }
3950 async fn get_bytes_range(
3951 &self,
3952 key: &str,
3953 range: std::ops::Range<u64>,
3954 ) -> Result<Bytes, ObjectStoreError> {
3955 self.inner.get_bytes_range(key, range).await
3956 }
3957 async fn put_bytes(
3958 &self,
3959 key: &str,
3960 body: Bytes,
3961 opts: PutOpts,
3962 ) -> Result<(), ObjectStoreError> {
3963 self.inner.put_bytes(key, body, opts).await
3964 }
3965 async fn put_path(
3966 &self,
3967 key: &str,
3968 src: &Path,
3969 opts: PutOpts,
3970 ) -> Result<(), ObjectStoreError> {
3971 self.put_path_progress_seen
3972 .lock()
3973 .expect("observation lock")
3974 .push((key.to_owned(), opts.progress.is_some()));
3975 self.inner.put_path(key, src, opts).await
3976 }
3977 async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
3978 self.inner.put_if_absent(key, body).await
3979 }
3980 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
3981 self.inner.head(key).await
3982 }
3983 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
3984 self.inner.copy(src, dst).await
3985 }
3986 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
3987 self.inner.delete(key).await
3988 }
3989 }
3990
3991 /// `perform_push_under_lock` must attach a `ProgressSink` to the
3992 /// bundle `put_path` so `git push` users see motion during a slow
3993 /// upload (issue #55). Before the fix, this call passed
3994 /// `PutOpts::default()` and the user saw nothing for hours on a
3995 /// 20 GiB push.
3996 #[tokio::test]
3997 async fn perform_push_under_lock_attaches_progress_sink_to_bundle_put_path() {
3998 let store = RecordingPutPathStore::default();
3999 let r = rn("refs/heads/main");
4000 let temp_dir = tempfile::Builder::new()
4001 .prefix("test_push_progress_")
4002 .tempdir()
4003 .unwrap();
4004 let bundle_path = temp_dir.path().join("bundle");
4005 std::fs::write(&bundle_path, b"fake bundle").unwrap();
4006 let state = PushReadyState {
4007 remote_ref: r,
4008 local_sha: Sha::from_hex(SHA).unwrap(),
4009 pre_existing: None,
4010 bundle_path,
4011 zip_artifacts: None,
4012 engine: StorageEngine::Bundle,
4013 force: false,
4014 pre_existing_was_ancestor: true,
4015 local_spec: "refs/heads/main".to_owned(),
4016 hidden_bundles: HashSet::new(),
4017 _temp_dir: temp_dir,
4018 };
4019 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4020 .await
4021 .unwrap();
4022 assert!(
4023 matches!(outcome, PushOutcome::Ok { .. }),
4024 "expected Ok outcome",
4025 );
4026 let observed = store.observed();
4027 // Exactly one `put_path` call (no zip artifacts in this fixture).
4028 // Pinning ≥1 rather than ==1 guards against future test
4029 // refactors that legitimately add another upload — the contract
4030 // we care about is that *every* bundle-class upload carries a
4031 // sink.
4032 assert!(
4033 !observed.is_empty(),
4034 "perform_push_under_lock must call put_path for the bundle",
4035 );
4036 for (key, has_sink) in &observed {
4037 assert!(
4038 has_sink,
4039 "put_path for `{key}` must carry a ProgressSink (issue #55)",
4040 );
4041 }
4042 }
4043
4044 /// `bundle_progress_sink` accepts an arbitrary stream of
4045 /// `report(amount)` calls without panicking and tolerates `None`
4046 /// for `total` (the stat-failed fallback). Pins the sink's
4047 /// public-shape contract: a regression that switched the
4048 /// `Option<u64>` to a non-optional `u64` would force every caller
4049 /// to handle stat errors, breaking the "graceful degradation"
4050 /// note in the helper's doc comment.
4051 #[test]
4052 fn bundle_progress_sink_accepts_reports_without_panicking() {
4053 let with_total = bundle_progress_sink("repo/bundle.bundle", Some(1_024));
4054 with_total.report(256);
4055 with_total.report(256);
4056 with_total.report(512);
4057
4058 let without_total = bundle_progress_sink("repo/bundle.bundle", None);
4059 without_total.report(1);
4060 without_total.report(u64::MAX); // saturates rather than wraps
4061 }
4062
4063 /// Pin the not-ancestor wire token at the Rust level. The shellspec
4064 /// suites (`spec/integration/{s3,az}/force_push_spec.sh`,
4065 /// `spec/live/s3/force_push_spec.sh`) assert on the literal
4066 /// substring `"not ancestor"` — but they only run via `make
4067 /// shellspec-*` targets, not the default `cargo test --workspace`
4068 /// gate. This test catches a Rust dev who renames the constant
4069 /// without updating the spec files.
4070 #[test]
4071 fn not_ancestor_token_value_is_stable() {
4072 assert_eq!(
4073 NOT_ANCESTOR_TOKEN, "not ancestor",
4074 "spec/{{integration,live}}/*/force_push_spec.sh asserts on this exact substring",
4075 );
4076 let formatted = format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of refs/heads/main."?"#);
4077 assert!(
4078 formatted.contains(NOT_ANCESTOR_TOKEN),
4079 "the not-ancestor PushOutcome::Error message must embed the token literally; got {formatted:?}",
4080 );
4081 }
4082
4083 /// CRLF in a commit-message summary would split the
4084 /// `x-amz-meta-codepipeline-artifact-revision-summary` (or
4085 /// `x-ms-meta-…`) header on the wire, letting a forged commit
4086 /// inject arbitrary user metadata onto the uploaded zip
4087 /// archive. `sanitize_metadata_value` must collapse every ASCII
4088 /// control byte (CR, LF, NUL, HTAB, …) to a space so the
4089 /// resulting header value is single-line and free of injection
4090 /// payloads.
4091 #[test]
4092 fn sanitize_metadata_value_strips_control_chars() {
4093 assert_eq!(
4094 sanitize_metadata_value("hello\r\nX-Injected: yes"),
4095 "hello X-Injected: yes",
4096 );
4097 assert_eq!(sanitize_metadata_value("nul\0byte"), "nul byte");
4098 assert_eq!(sanitize_metadata_value("plain text"), "plain text");
4099 assert_eq!(
4100 sanitize_metadata_value("café — short summary"),
4101 "café — short summary",
4102 "non-ASCII printable characters must pass through unchanged",
4103 );
4104 assert_eq!(sanitize_metadata_value(""), "");
4105 }
4106
4107 // --- Issue #129: force-push protection check runs under the lock ---
4108
4109 /// Regression for issue #129. If a concurrent `protect` lands a
4110 /// `PROTECTED#` marker between the pre-lock work in `prepare_push`
4111 /// and the lock acquisition in `push_one`, the under-lock arm of
4112 /// `perform_push_under_lock` must observe it and reject a non-FF
4113 /// force-push with the same `NotAncestor` wire token the pre-lock
4114 /// non-force probe would have produced.
4115 ///
4116 /// `pre_existing_was_ancestor = false` represents "this force-push
4117 /// is NOT a fast-forward" — exactly the case the historical
4118 /// protection-demotion logic rejected. The `PROTECTED#` key is
4119 /// seeded after the would-be pre-lock check (we simulate that by
4120 /// constructing the state with `force=true` and seeding the marker
4121 /// alongside the matching pre-existing bundle).
4122 #[tokio::test]
4123 async fn perform_push_under_lock_rejects_force_when_protected_under_lock_and_not_ff() {
4124 let store = MockStore::new();
4125 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4126 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4127 // Concurrent `protect` lands the marker before we get the lock.
4128 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4129 let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4130 state.force = true;
4131 state.pre_existing_was_ancestor = false;
4132 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4133 .await
4134 .unwrap();
4135 assert!(
4136 matches!(
4137 &outcome,
4138 PushOutcome::Error { message, .. }
4139 if message == r#""remote ref is not ancestor of refs/heads/main."?"#
4140 ),
4141 "expected under-lock NotAncestor refusal, got {outcome:?}",
4142 );
4143 // The protection marker must survive — the engine never touches
4144 // protect/unprotect state. A regression that swept it on the
4145 // refusal path would silently downgrade a server's protection.
4146 assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
4147 // The pre-existing bundle must survive intact: a refusal path
4148 // that erroneously progressed past the protection check could
4149 // overwrite or delete it.
4150 let local_sha = SHA;
4151 assert!(store.contains(&pre_key));
4152 assert!(
4153 !store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
4154 "refused push must not upload the new bundle",
4155 );
4156 }
4157
4158 /// Companion to the rejection case: when the user's local tip IS a
4159 /// fast-forward of the pre-existing remote bundle (`pre_existing_was_ancestor
4160 /// = true`), the historical "protected ref + force" semantic is to
4161 /// proceed — protection only blocks non-fast-forward force-pushes.
4162 /// This test pins that branch: a `PROTECTED#` marker under the
4163 /// lock plus a FF push must still succeed.
4164 #[tokio::test]
4165 async fn perform_push_under_lock_allows_force_when_protected_under_lock_but_ff() {
4166 let store = MockStore::new();
4167 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4168 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4169 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4170 let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4171 state.force = true;
4172 state.pre_existing_was_ancestor = true; // FF case.
4173 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4174 .await
4175 .unwrap();
4176 assert!(
4177 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4178 "FF push must pass even when protected, got {outcome:?}",
4179 );
4180 // Protection marker still in place after the push.
4181 assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
4182 }
4183
4184 /// Companion to the rejection case: a legitimate force-push (force,
4185 /// non-FF, no `PROTECTED#` marker) must proceed. This pins the
4186 /// polarity of the AND-clause guarding the protection rejection — a
4187 /// regression that dropped the `is_protected` check from the
4188 /// condition would refuse every non-FF force-push, not just those
4189 /// against protected refs.
4190 #[tokio::test]
4191 async fn perform_push_under_lock_allows_force_when_not_ancestor_and_not_protected() {
4192 let store = MockStore::new();
4193 let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
4194 store.insert(&pre_key, Bytes::from_static(b"old bundle"));
4195 // No PROTECTED# marker.
4196 let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
4197 state.force = true;
4198 state.pre_existing_was_ancestor = false; // non-FF force-push.
4199 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4200 .await
4201 .unwrap();
4202 assert!(
4203 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4204 "legitimate non-FF force-push must proceed, got {outcome:?}",
4205 );
4206 // A passing push uploads the new bundle keyed by `local_sha`.
4207 let local_sha = SHA;
4208 assert!(
4209 store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
4210 "new bundle must be uploaded on a successful force-push",
4211 );
4212 // Defensive: the NotAncestor wire token must NOT appear anywhere
4213 // in the outcome — that would mean the guard mis-fired.
4214 if let PushOutcome::Error { message, .. } = &outcome {
4215 assert!(
4216 !message.contains("not ancestor"),
4217 "force-push without protection must not emit NotAncestor: {message}",
4218 );
4219 }
4220 }
4221
4222 /// A non-force push (`force=false`) must never consult `is_protected`
4223 /// under the lock: non-FF non-force pushes were already rejected
4224 /// pre-lock by the ancestry probe, and FF non-force pushes are
4225 /// unaffected by protection. The under-lock check is gated on
4226 /// `force` precisely so this round-trip costs zero extra HEAD calls.
4227 ///
4228 /// Test-design note: `pre_existing_was_ancestor` is forced to `false`
4229 /// here so that ONLY the `force` clause of the under-lock guard
4230 /// (`force && !pre_existing_was_ancestor && is_protected(...)`)
4231 /// keeps the protection check off. A regression that drops the
4232 /// `force &&` clause would flip the guard to true and fail this
4233 /// test; if `pre_existing_was_ancestor` were left as `true`, the
4234 /// `!pre_existing_was_ancestor` clause would short-circuit and
4235 /// hide that regression. The `pre_existing` bundle key is omitted
4236 /// to keep the FF-vs-non-FF semantics consistent with "no prior
4237 /// remote SHA, therefore not an ancestor".
4238 #[tokio::test]
4239 async fn perform_push_under_lock_skips_protection_check_for_non_force() {
4240 let store = MockStore::new();
4241 // Marker present, but a non-force push must not even probe for it.
4242 store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
4243 let mut state = push_state_with_pre_existing(None);
4244 state.force = false;
4245 state.pre_existing_was_ancestor = false;
4246 let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
4247 .await
4248 .unwrap();
4249 assert!(
4250 matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
4251 "non-force push must pass regardless of protection: {outcome:?}",
4252 );
4253 }
4254
4255 // -----------------------------------------------------------------
4256 // Issue #151 — bundle-engine delete must not miss a `PROTECTED#`
4257 // marker written after the under-lock listing. The primary defence
4258 // is the per-ref lock (#159 made `protect`/`unprotect` acquire the
4259 // same key the delete holds). These tests pin the post-sweep
4260 // defensive verification (`verify_no_orphan_protected_after_delete`)
4261 // behaves correctly on the happy path and is silent there.
4262 // -----------------------------------------------------------------
4263
4264 #[tokio::test]
4265 async fn issue_151_clean_delete_passes_post_sweep_verification() {
4266 // Happy path with the lock contract honoured: bundle present,
4267 // no marker, lock held. The sweep deletes the bundle and the
4268 // post-sweep `head(PROTECTED#)` returns NotFound — silently —
4269 // and the delete reports `ok`. A regression that promoted the
4270 // post-sweep probe into a hard error would surface here.
4271 let store = MockStore::new();
4272 let r = rn("refs/heads/main");
4273 let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
4274 let lock_key = "repo/refs/heads/main/LOCK#.lock";
4275 store.insert(&bundle, Bytes::from_static(b"b"));
4276 store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
4277
4278 let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
4279 .await
4280 .unwrap();
4281 assert_eq!(
4282 outcome,
4283 PushOutcome::Ok {
4284 remote_ref: "refs/heads/main".into()
4285 },
4286 "clean delete must report ok after the post-sweep probe",
4287 );
4288 assert!(!store.contains(&bundle), "bundle must be swept");
4289 assert!(
4290 store.contains(lock_key),
4291 "lock survives the sweep (release removes it)",
4292 );
4293 }
4294
4295 /// Helper unit test for [`verify_no_orphan_protected_after_delete`]:
4296 /// the helper itself must not error or panic when the marker is
4297 /// absent, and the call must be cheap (a single `head`). Pinned
4298 /// here so a future refactor of the helper cannot regress its
4299 /// contract — the delete paths rely on this being a no-op on
4300 /// the happy path.
4301 #[tokio::test]
4302 async fn verify_no_orphan_protected_after_delete_is_noop_when_marker_absent() {
4303 use crate::object_store::mock::Fault;
4304 let store = MockStore::new();
4305 let r = rn("refs/heads/main");
4306 // Arm a one-shot transient HEAD fault on the marker key. The
4307 // helper MUST issue a `head()` on that key; the fault is the
4308 // witness. A regression that turned the helper into a literal
4309 // no-op would leave the fault unconsumed and fail the
4310 // pending-faults assertion below. Without this witness, the
4311 // "marker absent → marker absent" assertion is vacuous.
4312 store.arm(Fault::NetworkOnHead {
4313 key: "repo/refs/heads/main/PROTECTED#".to_owned(),
4314 });
4315 verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
4316 assert_eq!(
4317 store.pending_faults(),
4318 0,
4319 "helper must call head() on the marker key — fault unconsumed",
4320 );
4321 // The transient HEAD error goes through the `debug!` branch and
4322 // the helper returns silently; the bucket stays unchanged.
4323 assert!(
4324 !store.contains("repo/refs/heads/main/PROTECTED#"),
4325 "helper must not touch the bucket",
4326 );
4327 }
4328
4329 /// When the helper observes a marker — the lock-contract-violation
4330 /// branch — it logs at `error!` and returns silently. The bucket
4331 /// state must be unchanged (no rollback). This is the
4332 /// belt-and-suspenders branch the issue's race scenario would
4333 /// reach if a future regression bypassed the lock.
4334 #[tokio::test]
4335 async fn verify_no_orphan_protected_after_delete_does_not_mutate_when_marker_present() {
4336 let store = MockStore::new();
4337 let r = rn("refs/heads/main");
4338 let marker = "repo/refs/heads/main/PROTECTED#";
4339 store.insert(marker, Bytes::new());
4340 verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
4341 // The helper logs but does NOT delete the marker: rollback is
4342 // not the helper's job (the delete is already complete; the
4343 // operator-visible "ref is gone" outcome stands; the orphan
4344 // marker is surveillance telemetry).
4345 assert!(
4346 store.contains(marker),
4347 "helper must not delete the orphan marker — surveillance only",
4348 );
4349 }
4350}