use std::collections::HashSet;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use bytes::Bytes;
use time::{Duration, OffsetDateTime};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::git::{self, GitError, RefName, RefNameError, Sha, ShaError, is_valid_ref_name};
use crate::keys;
use crate::object_store::{ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts};
use crate::packchain::gc::{tombstoned_bundle_keys, write_baseline_tombstone_best_effort};
use crate::packchain::schema::Sha40;
use crate::url::{BackendKind, StorageEngine};
pub const DEFAULT_LOCK_TTL_SECONDS: u64 = 60;
struct PushConfig {
zip: bool,
engine: StorageEngine,
ttl: Duration,
kind: BackendKind,
}
pub(crate) const ENV_LOCK_TTL_SECONDS: &str = "GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS";
pub(crate) const NOT_ANCESTOR_TOKEN: &str = "not ancestor";
pub(crate) fn not_ancestor_wire_message(local_spec: &str) -> String {
format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of {local_spec}."?"#)
}
pub(crate) const DELETE_PROTECTION_MESSAGE: &str = r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#;
#[derive(Debug, thiserror::Error)]
pub enum PushError {
#[error("invalid push command {line:?}: expected `[+]<src>:<dst>`")]
Parse {
line: String,
},
#[error("invalid local ref-spec: {0:?}")]
InvalidLocalSpec(String),
#[error("invalid remote ref: {0}")]
RemoteRef(#[from] RefNameError),
#[error("invalid SHA in bundle key: {0}")]
Sha(#[from] ShaError),
#[error("object-store error during push: {0}")]
Store(#[from] ObjectStoreError),
#[error("git error during push: {0}")]
Git(#[from] GitError),
#[error("local I/O error during push: {0}")]
Io(#[from] std::io::Error),
#[error("packchain engine error during push: {0}")]
Packchain(#[from] crate::packchain::PackchainError),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PushOutcome {
Ok {
remote_ref: String,
},
Error {
remote_ref: String,
message: String,
},
}
impl PushOutcome {
#[must_use]
pub(crate) fn to_protocol_line(&self) -> String {
match self {
Self::Ok { remote_ref } => format!("ok {remote_ref}\n"),
Self::Error {
remote_ref,
message,
} => format!("error {remote_ref} {message}\n"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PushSpec {
pub(crate) force: bool,
pub(crate) local_spec: String,
pub(crate) remote_ref: RefName,
}
pub(crate) fn parse_push_args(args: &str) -> Result<PushSpec, PushError> {
let parse_err = || PushError::Parse {
line: args.to_owned(),
};
if args.is_empty() || args.contains(' ') {
return Err(parse_err());
}
let (local, remote) = args.split_once(':').ok_or_else(parse_err)?;
if remote.is_empty() {
return Err(parse_err());
}
let (force, local) = match local.strip_prefix('+') {
Some(rest) => (true, rest),
None => (false, local),
};
if !local.is_empty() && !is_valid_ref_name(local) {
return Err(PushError::InvalidLocalSpec(local.to_owned()));
}
let remote_ref = RefName::new(remote)?;
Ok(PushSpec {
force,
local_spec: local.to_owned(),
remote_ref,
})
}
pub(crate) fn ref_listing_prefix(prefix: Option<&str>, remote_ref: &RefName) -> String {
keys::ref_listing_prefix(prefix, remote_ref.as_str())
}
pub(crate) fn lock_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
format!("{}LOCK#.lock", ref_listing_prefix(prefix, remote_ref))
}
fn archive_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
format!("{}repo.zip", ref_listing_prefix(prefix, remote_ref))
}
pub(crate) fn head_key(prefix: Option<&str>) -> String {
keys::join(prefix, "HEAD")
}
fn is_bundle_candidate(key: &str) -> bool {
parse_remote_sha_from_key(key).is_some()
}
async fn bundles_for_ref(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
cached_hidden: Option<&HashSet<String>>,
) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
let listing = ref_listing_prefix(prefix, remote_ref);
let metas = store.list(&listing).await?;
let bundles: Vec<ObjectMeta> = metas
.into_iter()
.filter(|m| is_bundle_candidate(&m.key))
.collect();
if bundles.is_empty() {
return Ok(bundles);
}
let fetched;
let hidden: &HashSet<String> = if let Some(h) = cached_hidden {
h
} else {
fetched = tombstoned_bundle_keys(store, prefix).await?;
&fetched
};
Ok(bundles
.into_iter()
.filter(|m| !hidden.contains(&m.key))
.collect())
}
pub(crate) async fn is_protected(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
) -> Result<bool, ObjectStoreError> {
let key = protected_marker_key(prefix, remote_ref);
match store.head(&key).await {
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound(_)) => Ok(false),
Err(e) => Err(e),
}
}
pub(crate) fn protected_marker_key(prefix: Option<&str>, remote_ref: &RefName) -> String {
format!(
"{}{}",
ref_listing_prefix(prefix, remote_ref),
keys::PROTECTED_MARKER_SEGMENT,
)
}
pub(crate) async fn verify_no_orphan_protected_after_delete(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
) {
let key = protected_marker_key(prefix, remote_ref);
match store.head(&key).await {
Ok(_) => {
tracing::error!(
key = %key,
remote_ref = %remote_ref.as_str(),
"delete path observed a PROTECTED# marker after sweep; the per-ref lock contract (#158, #159) should make this impossible — investigate for lock bypass or bucket-level inconsistency",
);
}
Err(ObjectStoreError::NotFound(_)) => {}
Err(e) => {
tracing::debug!(
key = %key,
error = %e,
"post-sweep PROTECTED# probe failed; cannot verify orphan-marker invariant for this delete",
);
}
}
}
fn parse_remote_sha_from_key(key: &str) -> Option<Sha> {
let stem = parse_remote_sha_stem_from_key(key)?;
Sha::from_hex(stem).ok()
}
fn parse_remote_sha_stem_from_key(key: &str) -> Option<&str> {
let last = key.rsplit('/').next()?;
let stem = last.strip_suffix(".bundle")?;
if !keys::is_valid_bundle_stem(stem) {
return None;
}
Some(stem)
}
pub(crate) fn lock_ttl_from_env() -> Duration {
#[cfg(any(test, feature = "test-util"))]
let _read = crate::test_util::env_var_read_lock(ENV_LOCK_TTL_SECONDS);
let secs = env::var(ENV_LOCK_TTL_SECONDS)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|s| *s > 0)
.unwrap_or(DEFAULT_LOCK_TTL_SECONDS);
saturating_duration_seconds(secs)
}
#[must_use]
pub(crate) fn saturating_duration_seconds(secs: u64) -> Duration {
Duration::seconds(i64::try_from(secs).unwrap_or(i64::MAX))
}
pub(crate) fn lock_ttl_from_env_seconds() -> u64 {
u64::try_from(lock_ttl_from_env().whole_seconds())
.expect("lock_ttl_from_env returns a non-negative seconds count")
}
pub(crate) fn resolve_lock_ttl_seconds(opt: Option<u64>) -> u64 {
opt.filter(|&n| n > 0)
.unwrap_or_else(lock_ttl_from_env_seconds)
}
#[must_use = "lock guards must be released; dropping leaks the lock until TTL"]
pub(crate) struct LockGuard {
lock_key: String,
store: Arc<dyn ObjectStore>,
shutdown: watch::Sender<bool>,
heartbeat: Option<JoinHandle<()>>,
}
const HEARTBEAT_JOIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
impl LockGuard {
pub(crate) async fn release(mut self) -> Result<(), ObjectStoreError> {
self.stop_heartbeat().await;
delete_idempotent(self.store.as_ref(), &self.lock_key).await
}
async fn stop_heartbeat(&mut self) {
let _ = self.shutdown.send(true);
let Some(mut handle) = self.heartbeat.take() else {
return;
};
if tokio::time::timeout(HEARTBEAT_JOIN_TIMEOUT, &mut handle)
.await
.is_err()
{
warn!(
key = %self.lock_key,
timeout_secs = HEARTBEAT_JOIN_TIMEOUT.as_secs(),
"lock heartbeat did not exit within join timeout; \
falling back to abort (in-flight PUT may still race \
the upcoming DELETE)",
);
handle.abort();
}
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
let _ = self.shutdown.send(true);
if let Some(handle) = self.heartbeat.take() {
handle.abort();
}
}
}
pub(crate) fn heartbeat_interval(ttl: Duration) -> std::time::Duration {
let secs = ttl.whole_seconds().max(3) / 3;
let secs_u64 =
u64::try_from(secs).expect("ttl.whole_seconds().max(3) / 3 is always >= 1 (non-negative)");
std::time::Duration::from_secs(secs_u64)
}
pub(crate) async fn acquire_lock(
store: Arc<dyn ObjectStore>,
lock_key: &str,
ttl: Duration,
now: OffsetDateTime,
) -> Result<Option<LockGuard>, ObjectStoreError> {
if store.put_if_absent(lock_key, Bytes::new()).await? {
return Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)));
}
let meta = match store.head(lock_key).await {
Ok(m) => m,
Err(ObjectStoreError::NotFound(_)) => return Ok(None),
Err(e) => return Err(e),
};
let age = now - meta.last_modified;
if age <= ttl {
return Ok(None);
}
debug!(key = %lock_key, age_secs = age.whole_seconds(), "deleting stale lock");
delete_idempotent(store.as_ref(), lock_key).await?;
if store.put_if_absent(lock_key, Bytes::new()).await? {
Ok(Some(spawn_lock_guard(store, lock_key.to_owned(), ttl)))
} else {
Ok(None)
}
}
fn spawn_lock_guard(store: Arc<dyn ObjectStore>, lock_key: String, ttl: Duration) -> LockGuard {
let interval = heartbeat_interval(ttl);
let task_store = Arc::clone(&store);
let task_key = lock_key.clone();
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
tick.tick().await; loop {
tokio::select! {
biased;
_ = shutdown_rx.changed() => break,
_ = tick.tick() => {}
}
if *shutdown_rx.borrow() {
break;
}
match task_store
.put_bytes(&task_key, Bytes::new(), PutOpts::default())
.await
{
Ok(()) => debug!(key = %task_key, "lock heartbeat refreshed"),
Err(e) => warn!(
key = %task_key,
error = %e,
"lock heartbeat refresh failed; will retry",
),
}
}
});
LockGuard {
lock_key,
store,
shutdown: shutdown_tx,
heartbeat: Some(handle),
}
}
pub(crate) async fn release_lock(guard: LockGuard) -> Result<(), ObjectStoreError> {
guard.release().await
}
pub(crate) async fn delete_idempotent(
store: &dyn ObjectStore,
key: &str,
) -> Result<(), ObjectStoreError> {
match store.delete(key).await {
Ok(()) | Err(ObjectStoreError::NotFound(_)) => Ok(()),
Err(e) => Err(e),
}
}
async fn delete_prior_bundle_best_effort(
store: &dyn ObjectStore,
remote_ref: &RefName,
prior_key: &str,
) {
if let Err(e) = delete_idempotent(store, prior_key).await {
warn!(
ref_path = %remote_ref.as_str(),
key = %prior_key,
error = %e,
"prior-bundle cleanup failed (new bundle already committed); \
orphan key left for manual cleanup",
);
}
}
async fn defer_prior_bundle_via_tombstone(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
prior_key: &str,
local_sha: Sha,
) {
let Some(prior_stem) = parse_remote_sha_stem_from_key(prior_key) else {
warn!(
ref_path = %remote_ref.as_str(),
key = %prior_key,
"prior bundle key does not parse as a valid SHA; falling back to synchronous delete",
);
delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
return;
};
let (Ok(prior_sha40), Ok(local_sha40)) = (
Sha40::try_new(prior_stem),
Sha40::try_new(local_sha.to_string()),
) else {
warn!(
ref_path = %remote_ref.as_str(),
key = %prior_key,
"prior or local sha failed Sha40 validation; falling back to synchronous delete",
);
delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
return;
};
let wrote = write_baseline_tombstone_best_effort(
store,
prefix,
remote_ref,
&prior_sha40,
&local_sha40,
"bundle-engine-force-push",
)
.await;
if wrote {
debug!(
ref_path = %remote_ref.as_str(),
key = %prior_key,
"prior bundle deferred to gc sweep via baseline tombstone",
);
} else {
delete_prior_bundle_best_effort(store, remote_ref, prior_key).await;
}
}
async fn upload_zip_artifact_best_effort(
store: &dyn ObjectStore,
remote_ref: &RefName,
zip_dest: &str,
archive_path: &Path,
opts: PutOpts,
) {
if let Err(e) = store.put_path(zip_dest, archive_path, opts).await {
warn!(
ref_path = %remote_ref.as_str(),
key = %zip_dest,
error = %e,
"zip artifact upload failed (bundle already committed); \
retry the push to re-upload the zip at the same key",
);
}
}
pub(crate) async fn push_batch(
ctx: &super::BatchCtx,
kind: BackendKind,
zip: bool,
engine: StorageEngine,
cmds: Vec<String>,
) -> Result<Vec<PushOutcome>, PushError> {
if cmds.is_empty() {
return Ok(Vec::new());
}
debug!(count = cmds.len(), "processing push batch");
let config = PushConfig {
zip,
engine,
ttl: lock_ttl_from_env(),
kind,
};
let mut outcomes = Vec::with_capacity(cmds.len());
for cmd in cmds {
let spec = parse_push_args(&cmd)?;
let remote_ref_str = spec.remote_ref.as_str().to_owned();
let outcome = match push_one(
Arc::clone(&ctx.store),
ctx.prefix.as_deref(),
ctx.repo_dir.as_path(),
&config,
OffsetDateTime::now_utc(),
spec,
)
.await
{
Ok(o) => o,
Err(e)
if matches!(
e,
PushError::Store(_) | PushError::Git(_) | PushError::Io(_) | PushError::Sha(_)
) =>
{
let chain = full_error_chain(&e);
warn!(ref = %remote_ref_str, error = %chain, "push ref failed");
PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(r#""{chain}"?"#),
}
}
Err(e) => return Err(e),
};
outcomes.push(outcome);
}
Ok(outcomes)
}
fn full_error_chain(err: &PushError) -> String {
let mut msg = err.to_string();
super::append_source_chain(&mut msg, err);
msg
}
enum GitProbeError {
LocalRefNotFound,
NotAncestor,
}
struct LocalGit {
local_sha: Sha,
cwd: PathBuf,
zip_artifacts: Option<ZipArtifacts>,
pre_existing_was_ancestor: bool,
}
struct ZipArtifacts {
archive_path: PathBuf,
short_sha: String,
commit_msg: String,
_tempdir: tempfile::TempDir,
}
struct PushReadyState {
remote_ref: RefName,
local_sha: Sha,
pre_existing: Option<String>,
bundle_path: PathBuf,
zip_artifacts: Option<ZipArtifacts>,
engine: StorageEngine,
force: bool,
pre_existing_was_ancestor: bool,
local_spec: String,
hidden_bundles: HashSet<String>,
_temp_dir: tempfile::TempDir,
}
enum PrepareOutcome {
Ready(Box<PushReadyState>),
Delete { remote_ref: RefName, zip: bool },
Done(PushOutcome),
}
fn local_git_work(
repo_dir: &Path,
local_spec: &str,
pre_existing_sha: Option<Sha>,
force_push: bool,
zip: bool,
) -> Result<Result<LocalGit, GitProbeError>, GitError> {
let repo = gix::open(repo_dir)?;
let cwd = repo.workdir().unwrap_or_else(|| repo.git_dir()).to_owned();
let Ok(local_sha) = git::branch::resolve(&repo, local_spec) else {
return Ok(Err(GitProbeError::LocalRefNotFound));
};
let pre_existing_was_ancestor = match pre_existing_sha {
None => true,
Some(remote_sha) => git::is_ancestor(&repo, remote_sha, local_sha)?,
};
if !force_push && !pre_existing_was_ancestor {
return Ok(Err(GitProbeError::NotAncestor));
}
let zip_artifacts = if zip {
let tempdir = tempfile::Builder::new()
.prefix("git_remote_object_store_archive_")
.tempdir()?;
let archive_path = git::archive(&repo, tempdir.path(), local_spec)?;
let commit_msg = git::last_commit_message(&repo).unwrap_or_default();
let sha_hex = local_sha.to_string();
let short_sha = sha_hex[..8].to_owned();
Some(ZipArtifacts {
archive_path,
short_sha,
commit_msg,
_tempdir: tempdir,
})
} else {
None
};
drop(repo);
Ok(Ok(LocalGit {
local_sha,
cwd,
zip_artifacts,
pre_existing_was_ancestor,
}))
}
async fn prepare_push(
store: &dyn ObjectStore,
prefix: Option<&str>,
repo_dir: &Path,
config: &PushConfig,
spec: PushSpec,
) -> Result<PrepareOutcome, PushError> {
let PushSpec {
force,
local_spec,
remote_ref,
} = spec;
let remote_ref_str = remote_ref.as_str().to_owned();
if local_spec.is_empty() {
return Ok(PrepareOutcome::Delete {
remote_ref,
zip: config.zip,
});
}
let force_push = force;
debug!(local = %local_spec, remote = %remote_ref, force_push, "push");
let hidden_bundles = tombstoned_bundle_keys(store, prefix).await?;
let pre_bundles = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
if pre_bundles.len() > 1 {
return Ok(PrepareOutcome::Done(PushOutcome::Error {
remote_ref: remote_ref_str,
message:
r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
.to_owned(),
}));
}
let pre_existing = pre_bundles.into_iter().next().map(|m| m.key);
let pre_existing_sha = match pre_existing.as_deref() {
None => None,
Some(key) => {
let Some(s) = parse_remote_sha_from_key(key) else {
return Ok(PrepareOutcome::Done(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""unable to parse remote bundle key {key:?}; run git-remote-object-store doctor to fix."?"#,
),
}));
};
Some(s)
}
};
let probe = local_git_work(
repo_dir,
&local_spec,
pre_existing_sha,
force_push,
config.zip,
)?;
let local = match probe {
Ok(local) => local,
Err(GitProbeError::LocalRefNotFound) => {
return Ok(PrepareOutcome::Done(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(r#""{local_spec} not found"?"#),
}));
}
Err(GitProbeError::NotAncestor) => {
return Ok(PrepareOutcome::Done(PushOutcome::Error {
remote_ref: remote_ref_str,
message: not_ancestor_wire_message(&local_spec),
}));
}
};
let temp_dir = tempfile::Builder::new()
.prefix("git_remote_object_store_push_")
.tempdir()?;
let bundle_path =
git::bundle_at(&local.cwd, temp_dir.path(), local.local_sha, &local_spec).await?;
Ok(PrepareOutcome::Ready(Box::new(PushReadyState {
remote_ref,
local_sha: local.local_sha,
pre_existing,
bundle_path,
zip_artifacts: local.zip_artifacts,
engine: config.engine,
force,
pre_existing_was_ancestor: local.pre_existing_was_ancestor,
local_spec,
hidden_bundles,
_temp_dir: temp_dir,
})))
}
async fn push_one(
store: Arc<dyn ObjectStore>,
prefix: Option<&str>,
repo_dir: &Path,
config: &PushConfig,
now: OffsetDateTime,
spec: PushSpec,
) -> Result<PushOutcome, PushError> {
let (remote_ref_str, work): (String, UnderLockWork) =
match prepare_push(store.as_ref(), prefix, repo_dir, config, spec).await? {
PrepareOutcome::Done(o) => return Ok(o),
PrepareOutcome::Ready(state) => (
state.remote_ref.as_str().to_owned(),
UnderLockWork::Push(state),
),
PrepareOutcome::Delete { remote_ref, zip } => (
remote_ref.as_str().to_owned(),
UnderLockWork::Delete { remote_ref, zip },
),
};
let lock = match &work {
UnderLockWork::Push(state) => lock_key(prefix, &state.remote_ref),
UnderLockWork::Delete { remote_ref, .. } => lock_key(prefix, remote_ref),
};
let Some(guard) = acquire_lock(Arc::clone(&store), &lock, config.ttl, now).await? else {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to acquire ref lock at {lock}. Another client may be pushing or deleting. If this persists beyond {}s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
config.ttl.whole_seconds(),
),
});
};
let result = match work {
UnderLockWork::Push(state) => {
perform_push_under_lock(store.as_ref(), prefix, config.kind, *state).await
}
UnderLockWork::Delete { remote_ref, zip } => {
delete_remote_ref_under_lock(store.as_ref(), prefix, &remote_ref, zip, &lock).await
}
};
let release_result = release_lock(guard).await;
match (&result, release_result) {
(Ok(PushOutcome::Ok { .. }), Err(e)) => {
warn!(key = %lock, error = %e, "failed to release lock");
Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
),
})
}
(_, Err(e)) => {
warn!(key = %lock, error = %e, "lock release failed (work already errored)");
result
}
_ => result,
}
}
enum UnderLockWork {
Push(Box<PushReadyState>),
Delete { remote_ref: RefName, zip: bool },
}
async fn perform_push_under_lock(
store: &dyn ObjectStore,
prefix: Option<&str>,
kind: BackendKind,
state: PushReadyState,
) -> Result<PushOutcome, PushError> {
let PushReadyState {
remote_ref,
local_sha,
pre_existing,
bundle_path,
zip_artifacts,
engine,
force,
pre_existing_was_ancestor,
local_spec,
hidden_bundles,
_temp_dir,
} = state;
let remote_ref_str = remote_ref.as_str().to_owned();
if force && !pre_existing_was_ancestor && is_protected(store, prefix, &remote_ref).await? {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: not_ancestor_wire_message(&local_spec),
});
}
let current = bundles_for_ref(store, prefix, &remote_ref, Some(&hidden_bundles)).await?;
if current.len() > 1 {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
});
}
let current_key = current.into_iter().next().map(|m| m.key);
if pre_existing.as_deref() != current_key.as_deref() {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: r#""stale remote. Please fetch and retry."?"#.to_owned(),
});
}
let bundle_dest = keys::bundle_key(prefix, &remote_ref, local_sha);
let bundle_total = tokio::fs::metadata(&bundle_path)
.await
.map(|m| m.len())
.ok();
let bundle_opts = PutOpts {
progress: Some(bundle_progress_sink(&bundle_dest, bundle_total)),
..PutOpts::default()
};
store
.put_path(&bundle_dest, &bundle_path, bundle_opts)
.await?;
let head = head_key(prefix);
store
.put_if_absent(
&head,
Bytes::copy_from_slice(remote_ref.as_str().as_bytes()),
)
.await?;
let format_key = keys::join(prefix, "FORMAT");
store
.put_if_absent(&format_key, Bytes::from_static(engine.as_str().as_bytes()))
.await?;
if let Some(prev) = current_key
&& prev != bundle_dest
{
defer_prior_bundle_via_tombstone(store, prefix, &remote_ref, &prev, local_sha).await;
}
if let Some(artifacts) = zip_artifacts {
let zip_dest = archive_key(prefix, &remote_ref);
let zip_total = tokio::fs::metadata(&artifacts.archive_path)
.await
.map(|m| m.len())
.ok();
let user_metadata = match kind {
BackendKind::S3 => vec![(
"codepipeline-artifact-revision-summary".to_owned(),
sanitize_metadata_value(&artifacts.commit_msg),
)],
BackendKind::Azure => Vec::new(),
};
let opts = PutOpts {
content_disposition: Some(format!(
"attachment; filename=repo-{}.zip",
artifacts.short_sha
)),
user_metadata,
progress: Some(bundle_progress_sink(&zip_dest, zip_total)),
};
upload_zip_artifact_best_effort(
store,
&remote_ref,
&zip_dest,
&artifacts.archive_path,
opts,
)
.await;
}
Ok(PushOutcome::Ok {
remote_ref: remote_ref_str,
})
}
pub(crate) fn bundle_progress_sink(key: &str, total: Option<u64>) -> ProgressSink {
let key = key.to_owned();
let bytes_so_far = Arc::new(AtomicU64::new(0));
ProgressSink::new(move |bytes_amount| {
let so_far = bytes_so_far
.fetch_add(bytes_amount, Ordering::Relaxed)
.saturating_add(bytes_amount);
if let Some(t) = total {
info!(
key = %key,
bytes_so_far = so_far,
total = t,
bytes_chunk = bytes_amount,
"uploading"
);
} else {
info!(
key = %key,
bytes_so_far = so_far,
bytes_chunk = bytes_amount,
"uploading"
);
}
})
}
fn sanitize_metadata_value(s: &str) -> String {
s.chars()
.map(|c| if c.is_control() { ' ' } else { c })
.collect()
}
const DELETE_EXPECTED_NO_ZIP: usize = 1;
const DELETE_EXPECTED_WITH_ZIP: usize = 2;
async fn delete_remote_ref_under_lock(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
zip: bool,
lock_key: &str,
) -> Result<PushOutcome, PushError> {
let listing = ref_listing_prefix(prefix, remote_ref);
let all_entries = store.list(&listing).await?;
let entries: Vec<&ObjectMeta> = all_entries.iter().filter(|e| e.key != lock_key).collect();
let expected = if zip {
DELETE_EXPECTED_WITH_ZIP
} else {
DELETE_EXPECTED_NO_ZIP
};
let remote_ref_str = remote_ref.as_str().to_owned();
if keys::entries_have_protected_marker(&all_entries) {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: DELETE_PROTECTION_MESSAGE.to_owned(),
});
}
if entries.len() == expected {
for entry in &entries {
delete_idempotent(store, &entry.key).await?;
}
verify_no_orphan_protected_after_delete(store, prefix, remote_ref).await;
Ok(PushOutcome::Ok {
remote_ref: remote_ref_str,
})
} else if entries.is_empty() {
Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: r#""not found"?"#.to_owned(),
})
} else {
Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message:
r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
.to_owned(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::mock::MockStore;
use crate::packchain::gc::baseline_tombstone_listing_prefix;
const SHA: &str = "0123456789abcdef0123456789abcdef01234567";
const OTHER_SHA: &str = "ffffffffffffffffffffffffffffffffffffffff";
const _: () = {
let a = SHA.as_bytes();
let b = OTHER_SHA.as_bytes();
let mut i = 0;
let mut differs = a.len() != b.len();
while i < a.len() && i < b.len() {
if a[i] != b[i] {
differs = true;
}
i += 1;
}
assert!(differs, "OTHER_SHA must differ from SHA");
};
fn rn(s: &str) -> RefName {
RefName::new(s).expect("RefName")
}
#[test]
fn parse_push_args_accepts_canonical_form() {
let spec = parse_push_args("refs/heads/main:refs/heads/main").expect("parse");
assert!(!spec.force);
assert_eq!(spec.local_spec, "refs/heads/main");
assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
}
#[test]
fn parse_push_args_accepts_force_flag() {
let spec = parse_push_args("+refs/heads/main:refs/heads/main").expect("parse");
assert!(spec.force);
assert_eq!(spec.local_spec, "refs/heads/main");
}
#[test]
fn parse_push_args_accepts_delete_form() {
let spec = parse_push_args(":refs/heads/main").expect("parse");
assert!(!spec.force);
assert!(spec.local_spec.is_empty());
assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
}
#[test]
fn parse_push_args_accepts_force_delete_form() {
let spec = parse_push_args("+:refs/heads/main").expect("parse");
assert!(spec.force);
assert!(spec.local_spec.is_empty());
assert_eq!(spec.remote_ref.as_str(), "refs/heads/main");
}
#[test]
fn parse_push_args_accepts_short_local() {
let spec = parse_push_args("HEAD:refs/heads/main").expect("parse");
assert_eq!(spec.local_spec, "HEAD");
}
#[test]
fn parse_push_args_rejects_missing_colon() {
assert!(matches!(
parse_push_args("refs/heads/main"),
Err(PushError::Parse { .. })
));
}
#[test]
fn parse_push_args_rejects_empty_remote() {
assert!(matches!(
parse_push_args("refs/heads/main:"),
Err(PushError::Parse { .. })
));
}
#[test]
fn parse_push_args_rejects_invalid_remote_ref() {
assert!(matches!(
parse_push_args("refs/heads/main:refs/heads/.bad"),
Err(PushError::RemoteRef(_))
));
}
#[test]
fn parse_push_args_rejects_invalid_local_spec() {
assert!(matches!(
parse_push_args("refs/heads/.bad:refs/heads/main"),
Err(PushError::InvalidLocalSpec(_))
));
}
#[test]
fn parse_push_args_rejects_embedded_whitespace() {
assert!(matches!(
parse_push_args("refs/heads/main:refs/heads/main extra"),
Err(PushError::Parse { .. })
));
}
#[test]
fn parse_push_args_rejects_empty_input() {
assert!(matches!(parse_push_args(""), Err(PushError::Parse { .. })));
}
#[test]
fn key_formatters_with_prefix() {
let r = rn("refs/heads/main");
let sha = Sha::from_hex(SHA).unwrap();
assert_eq!(
keys::bundle_key(Some("repo"), &r, sha),
format!("repo/refs/heads/main/{SHA}.bundle"),
);
assert_eq!(
lock_key(Some("repo"), &r),
"repo/refs/heads/main/LOCK#.lock"
);
assert_eq!(
archive_key(Some("repo"), &r),
"repo/refs/heads/main/repo.zip"
);
assert_eq!(head_key(Some("repo")), "repo/HEAD");
}
#[test]
fn key_formatters_with_no_prefix() {
let r = rn("refs/heads/main");
let sha = Sha::from_hex(SHA).unwrap();
assert_eq!(
keys::bundle_key(None, &r, sha),
format!("refs/heads/main/{SHA}.bundle"),
);
assert_eq!(lock_key(None, &r), "refs/heads/main/LOCK#.lock");
assert_eq!(archive_key(None, &r), "refs/heads/main/repo.zip");
assert_eq!(head_key(None), "HEAD");
assert_eq!(head_key(Some("")), "HEAD");
assert_eq!(lock_key(Some(""), &r), "refs/heads/main/LOCK#.lock");
}
#[test]
fn is_bundle_candidate_keeps_real_bundles() {
assert!(is_bundle_candidate(&format!(
"repo/refs/heads/main/{SHA}.bundle"
)));
assert!(is_bundle_candidate(&format!(
"refs/heads/main/{SHA}.bundle"
)));
}
#[test]
fn is_bundle_candidate_rejects_protected_zip_lock() {
assert!(!is_bundle_candidate("repo/refs/heads/main/PROTECTED#"));
assert!(!is_bundle_candidate("repo/refs/heads/main/repo.zip"));
assert!(!is_bundle_candidate("repo/refs/heads/main/LOCK#.lock"));
assert!(!is_bundle_candidate("repo/refs/heads/main/file.lock"));
assert!(!is_bundle_candidate("repo/refs/heads/main/LOCKS/x"));
}
#[test]
fn is_bundle_candidate_keeps_refs_containing_zip_substring() {
assert!(is_bundle_candidate(&format!(
"repo/refs/heads/v1.zip-rc1/{SHA}.bundle"
)));
assert!(is_bundle_candidate(&format!(
"refs/heads/myrelease.zip-v1/{SHA}.bundle"
)));
}
#[test]
fn is_bundle_candidate_keeps_refs_containing_locks_substring() {
assert!(is_bundle_candidate(&format!(
"repo/refs/heads/LOCKS-feature/x/{SHA}.bundle"
)));
assert!(is_bundle_candidate(&format!(
"refs/heads/LOCKS/sub/{SHA}.bundle"
)));
}
#[test]
fn is_bundle_candidate_keeps_refs_containing_lock_substring() {
assert!(is_bundle_candidate(&format!(
"refs/heads/feature.lock-rc/{SHA}.bundle"
)));
}
#[test]
fn parse_remote_sha_from_key_extracts_lower_hex_40() {
let sha = parse_remote_sha_from_key(&format!("repo/refs/heads/main/{SHA}.bundle"))
.expect("parse");
assert_eq!(sha.to_string(), SHA);
}
#[test]
fn parse_remote_sha_from_key_rejects_uppercase() {
let upper = SHA.to_uppercase();
assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{upper}.bundle")).is_none());
}
#[test]
fn parse_remote_sha_from_key_rejects_wrong_length() {
let short = &SHA[..39];
assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{short}.bundle")).is_none());
}
#[test]
fn parse_remote_sha_from_key_rejects_missing_extension() {
assert!(parse_remote_sha_from_key(&format!("refs/heads/main/{SHA}")).is_none());
}
#[tokio::test]
async fn bundles_for_ref_filters_protected_zip_lock() {
let store = MockStore::new();
let r = rn("refs/heads/main");
store.insert(
format!("repo/refs/heads/main/{SHA}.bundle"),
Bytes::from_static(b"b"),
);
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
store.insert("repo/refs/heads/main/repo.zip", Bytes::from_static(b""));
store.insert("repo/refs/heads/main/LOCK#.lock", Bytes::from_static(b""));
let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
.await
.unwrap();
assert_eq!(bundles.len(), 1);
assert!(bundles[0].key.ends_with(".bundle"));
}
#[tokio::test]
async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_zip() {
let store = MockStore::new();
let r = rn("refs/heads/v1.zip-rc1");
let bundle_key = format!("repo/refs/heads/v1.zip-rc1/{SHA}.bundle");
store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
.await
.unwrap();
assert_eq!(bundles.len(), 1);
assert_eq!(bundles[0].key, bundle_key);
}
#[tokio::test]
async fn bundles_for_ref_keeps_bundle_when_ref_name_contains_locks() {
let store = MockStore::new();
let r = rn("refs/heads/LOCKS-feature/x");
let bundle_key = format!("repo/refs/heads/LOCKS-feature/x/{SHA}.bundle");
store.insert(bundle_key.clone(), Bytes::from_static(b"b"));
let bundles = bundles_for_ref(&store, Some("repo"), &r, None)
.await
.unwrap();
assert_eq!(bundles.len(), 1);
assert_eq!(bundles[0].key, bundle_key);
}
#[tokio::test]
async fn bundles_for_ref_skips_tombstone_lookup_when_cache_provided() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingStore {
inner: MockStore,
gc_lists: AtomicUsize,
tombstone_gets: AtomicUsize,
}
crate::delegate_to_inner_impl! {
impl ObjectStore for CountingStore {
forward: get_to_file, get_bytes_range,
put_bytes, put_if_absent,
head, copy, delete;
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
if prefix == "repo/gc/" {
self.gc_lists.fetch_add(1, Ordering::SeqCst);
}
self.inner.list(prefix).await
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
if key.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))) {
self.tombstone_gets.fetch_add(1, Ordering::SeqCst);
}
self.inner.get_bytes(key).await
}
}
}
let inner = MockStore::new();
let r = rn("refs/heads/main");
inner.insert(
format!("repo/refs/heads/main/{SHA}.bundle"),
Bytes::from_static(b"b"),
);
let tomb_body = format!(
r#"{{"v":1,"ref_name":"refs/heads/main","sha":"{OTHER_SHA}","marked_at":"2024-01-01T00:00:00Z"}}"#
);
inner.insert(
format!(
"{}test.json",
baseline_tombstone_listing_prefix(Some("repo"))
),
Bytes::from(tomb_body),
);
let store = CountingStore {
inner,
gc_lists: AtomicUsize::new(0),
tombstone_gets: AtomicUsize::new(0),
};
let bundles_uncached = bundles_for_ref(&store, Some("repo"), &r, None)
.await
.unwrap();
assert_eq!(bundles_uncached.len(), 1);
assert_eq!(store.gc_lists.load(Ordering::SeqCst), 1);
assert_eq!(store.tombstone_gets.load(Ordering::SeqCst), 1);
let hidden: HashSet<String> = [format!("repo/refs/heads/main/{OTHER_SHA}.bundle")]
.into_iter()
.collect();
let bundles_cached = bundles_for_ref(&store, Some("repo"), &r, Some(&hidden))
.await
.unwrap();
assert_eq!(bundles_cached.len(), 1);
assert_eq!(
store.gc_lists.load(Ordering::SeqCst),
1,
"cached call must not re-list gc/",
);
assert_eq!(
store.tombstone_gets.load(Ordering::SeqCst),
1,
"cached call must not refetch any tombstone body",
);
}
#[tokio::test]
async fn is_protected_detects_marker() {
let store = MockStore::new();
let r = rn("refs/heads/main");
assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
assert!(is_protected(&store, Some("repo"), &r).await.unwrap());
}
#[tokio::test]
async fn is_protected_ignores_protected_prefixed_sibling() {
let store = MockStore::new();
let r = rn("refs/heads/main");
store.insert(
"repo/refs/heads/main/PROTECTED#audit",
Bytes::from_static(b""),
);
assert!(!is_protected(&store, Some("repo"), &r).await.unwrap());
}
#[tokio::test]
async fn is_protected_uses_head_not_list() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let r = rn("refs/heads/main");
store.arm(Fault::AccessDeniedOnAnyList);
let got = is_protected(&store, Some("repo"), &r).await.unwrap();
assert!(!got);
assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let got = is_protected(&store, Some("repo"), &r).await.unwrap();
assert!(got);
assert_eq!(store.pending_faults(), 1, "is_protected must not call list");
}
#[tokio::test]
async fn acquire_lock_succeeds_when_absent() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap();
assert!(guard.is_some(), "expected a fresh guard");
assert!(store.contains("k"));
drop(guard);
}
#[tokio::test]
async fn acquire_lock_returns_none_when_recently_held() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
store.insert_with("k", Bytes::new(), now, PutOpts::default());
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap();
assert!(guard.is_none(), "expected contention");
}
#[tokio::test]
async fn acquire_lock_recovers_stale_lock() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let stale = now - Duration::seconds(120);
store.insert_with("k", Bytes::new(), stale, PutOpts::default());
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap();
assert!(guard.is_some(), "stale lock must be recoverable");
assert!(store.contains("k"));
drop(guard);
}
#[tokio::test]
async fn acquire_lock_treats_disappeared_lock_as_contention() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
store.insert("k", Bytes::new());
store.arm(Fault::NotFoundOnHead { key: "k".into() });
let arc = Arc::new(store);
let now = OffsetDateTime::now_utc();
let guard = acquire_lock(
Arc::clone(&arc) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap();
assert!(guard.is_none(), "expected contention on disappeared lock");
assert_eq!(arc.pending_faults(), 0);
}
#[tokio::test]
async fn release_lock_deletes_existing_key() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap()
.expect("acquire_lock must succeed on an empty store");
release_lock(guard).await.unwrap();
assert!(!store.contains("k"));
}
#[tokio::test]
async fn release_lock_swallows_not_found_when_lock_already_gone() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap()
.expect("acquire_lock must succeed");
guard.heartbeat.as_ref().unwrap().abort();
tokio::task::yield_now().await;
let _ = store.delete("k").await;
release_lock(guard).await.unwrap();
}
#[tokio::test]
async fn release_lock_propagates_non_not_found_errors() {
use crate::object_store::mock::Fault;
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let guard = acquire_lock(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap()
.expect("acquire_lock must succeed");
store.arm(Fault::NetworkOnDelete { key: "k".into() });
let err = release_lock(guard).await.unwrap_err();
assert!(
matches!(err, ObjectStoreError::Network(_)),
"expected Network error, got {err:?}",
);
assert_eq!(store.pending_faults(), 0);
assert!(store.contains("k"));
}
#[tokio::test(start_paused = true)]
async fn heartbeat_keeps_lock_alive_past_ttl() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let ttl = Duration::seconds(4);
let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
.await
.unwrap()
.expect("acquire must succeed");
for _ in 0..5 {
tokio::time::advance(std::time::Duration::from_secs(3)).await;
tokio::task::yield_now().await;
}
let future = OffsetDateTime::now_utc();
let other = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
.await
.unwrap();
assert!(
other.is_none(),
"live lock must not be stealable while the holder's heartbeat runs",
);
release_lock(guard).await.unwrap();
assert!(!store.contains("k"));
}
#[tokio::test(start_paused = true)]
async fn release_lock_stops_heartbeat() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let ttl = Duration::seconds(4);
let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
.await
.unwrap()
.expect("acquire must succeed");
release_lock(guard).await.unwrap();
assert!(!store.contains("k"));
for _ in 0..5 {
tokio::time::advance(std::time::Duration::from_secs(3)).await;
tokio::task::yield_now().await;
}
assert!(
!store.contains("k"),
"heartbeat must not re-create the key after release",
);
}
#[tokio::test(start_paused = true)]
async fn lock_guard_drop_aborts_heartbeat() {
let store = Arc::new(MockStore::new());
let now = OffsetDateTime::now_utc();
let ttl = Duration::seconds(4);
let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
.await
.unwrap()
.expect("acquire must succeed");
drop(guard);
let after_drop = store.head("k").await.expect("lock present").last_modified;
for _ in 0..5 {
tokio::time::advance(std::time::Duration::from_secs(3)).await;
tokio::task::yield_now().await;
}
let after_advance = store
.head("k")
.await
.expect("lock still present")
.last_modified;
assert_eq!(
after_drop, after_advance,
"heartbeat must not refresh last_modified after drop",
);
let future = now + Duration::seconds(120);
let recovered = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, future)
.await
.unwrap();
assert!(recovered.is_some(), "orphaned lock must be reclaimable");
drop(recovered);
}
#[allow(clippy::too_many_lines)]
#[tokio::test(start_paused = true)]
async fn release_awaits_in_flight_heartbeat_put_before_delete() {
use std::sync::Mutex;
use tokio::sync::Notify;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Op {
PutStart,
PutEnd,
DeleteStart,
DeleteEnd,
}
struct GatedPutStore {
inner: Arc<MockStore>,
put_gate: Arc<Notify>,
log: Arc<Mutex<Vec<Op>>>,
gated_key: String,
gate_consumed: std::sync::atomic::AtomicBool,
}
#[async_trait::async_trait]
impl ObjectStore for GatedPutStore {
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
self.inner.list(prefix).await
}
async fn get_to_file(
&self,
key: &str,
dest: &std::path::Path,
opts: crate::object_store::GetOpts,
) -> Result<(), ObjectStoreError> {
self.inner.get_to_file(key, dest, opts).await
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes(key).await
}
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes_range(key, range).await
}
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
let is_gated = key == self.gated_key
&& !self
.gate_consumed
.swap(true, std::sync::atomic::Ordering::SeqCst);
if is_gated {
self.log.lock().unwrap().push(Op::PutStart);
self.put_gate.notified().await;
}
let result = self.inner.put_bytes(key, body, opts).await;
if is_gated {
self.log.lock().unwrap().push(Op::PutEnd);
}
result
}
async fn put_path(
&self,
key: &str,
src: &std::path::Path,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
self.inner.put_path(key, src, opts).await
}
async fn put_if_absent(
&self,
key: &str,
body: Bytes,
) -> Result<bool, ObjectStoreError> {
self.inner.put_if_absent(key, body).await
}
async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
self.inner.head(key).await
}
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
self.inner.copy(src, dst).await
}
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
self.log.lock().unwrap().push(Op::DeleteStart);
let result = self.inner.delete(key).await;
self.log.lock().unwrap().push(Op::DeleteEnd);
result
}
}
let inner = Arc::new(MockStore::new());
let put_gate = Arc::new(Notify::new());
let log = Arc::new(Mutex::new(Vec::<Op>::new()));
let store = Arc::new(GatedPutStore {
inner: Arc::clone(&inner),
put_gate: Arc::clone(&put_gate),
log: Arc::clone(&log),
gated_key: "k".to_owned(),
gate_consumed: std::sync::atomic::AtomicBool::new(false),
});
let now = OffsetDateTime::now_utc();
let ttl = Duration::seconds(4);
let guard = acquire_lock(Arc::clone(&store) as Arc<dyn ObjectStore>, "k", ttl, now)
.await
.unwrap()
.expect("acquire must succeed");
for _ in 0..4 {
tokio::task::yield_now().await;
}
tokio::time::advance(std::time::Duration::from_secs(2)).await;
for _ in 0..16 {
tokio::task::yield_now().await;
if !log.lock().unwrap().is_empty() {
break;
}
}
assert_eq!(
log.lock().unwrap().as_slice(),
&[Op::PutStart],
"heartbeat PUT must be in flight before release fires",
);
let release_store = Arc::clone(&store);
let release_handle = tokio::spawn(async move {
let _ = release_store; release_lock(guard).await
});
for _ in 0..8 {
tokio::task::yield_now().await;
}
assert_eq!(
log.lock().unwrap().as_slice(),
&[Op::PutStart],
"release must NOT issue DELETE while heartbeat PUT is in flight",
);
put_gate.notify_one();
let result = release_handle.await.expect("release task panicked");
result.expect("release_lock");
let final_log = log.lock().unwrap().clone();
assert_eq!(
final_log,
vec![Op::PutStart, Op::PutEnd, Op::DeleteStart, Op::DeleteEnd],
"operation order must be: heartbeat PUT completes, THEN release DELETE",
);
assert!(
!inner.contains("k"),
"lock key must be deleted after release"
);
}
#[tokio::test]
async fn delete_remote_ref_removes_single_bundle() {
let store = MockStore::new();
let r = rn("refs/heads/main");
store.insert(
format!("repo/refs/heads/main/{SHA}.bundle"),
Bytes::from_static(b"b"),
);
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
false,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
assert_eq!(
outcome,
PushOutcome::Ok {
remote_ref: "refs/heads/main".into()
}
);
let remaining: Vec<_> = store
.keys()
.into_iter()
.filter(|k| k.starts_with("repo/refs/heads/main/"))
.collect();
assert!(
remaining.is_empty(),
"ref prefix must be empty after delete: {remaining:?}",
);
let gc_keys: Vec<_> = store
.keys()
.into_iter()
.filter(|k| k.starts_with("repo/gc/"))
.collect();
assert!(
gc_keys.is_empty(),
"bundle-engine delete must not write a tombstone: {gc_keys:?}",
);
}
#[tokio::test]
async fn delete_remote_ref_returns_not_found_when_empty() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
false,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(message, r#""not found"?"#);
}
PushOutcome::Ok { .. } => panic!("expected Error outcome"),
}
}
#[tokio::test]
async fn delete_remote_ref_rejects_protected_marker() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
let protected = "repo/refs/heads/main/PROTECTED#";
store.insert(&bundle, Bytes::from_static(b"b"));
store.insert(protected, Bytes::from_static(b""));
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
false,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message,
r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
);
}
PushOutcome::Ok { .. } => panic!("expected Error outcome"),
}
assert!(store.contains(&bundle));
assert!(store.contains(protected));
}
#[tokio::test]
async fn delete_remote_ref_reports_corruption_without_protected_marker() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle_a = format!("repo/refs/heads/main/{SHA}.bundle");
let bundle_b = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&bundle_a, Bytes::from_static(b"a"));
store.insert(&bundle_b, Bytes::from_static(b"b"));
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
false,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message,
r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#,
);
}
PushOutcome::Ok { .. } => panic!("expected Error outcome"),
}
assert!(store.contains(&bundle_a));
assert!(store.contains(&bundle_b));
}
#[tokio::test]
async fn delete_remote_ref_rejects_protected_marker_when_count_matches_zip() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
let protected = "repo/refs/heads/main/PROTECTED#";
store.insert(&bundle, Bytes::from_static(b"b"));
store.insert(protected, Bytes::from_static(b""));
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
true,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message,
r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
);
}
PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
}
assert!(store.contains(&bundle), "bundle must survive");
assert!(store.contains(protected), "marker must survive");
}
#[tokio::test]
async fn delete_remote_ref_rejects_protected_marker_when_only_marker_present() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let protected = "repo/refs/heads/main/PROTECTED#";
store.insert(protected, Bytes::from_static(b""));
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
false,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message,
r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
);
}
PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
}
assert!(store.contains(protected), "marker must survive");
}
#[tokio::test]
async fn delete_remote_ref_rejects_protect_landed_between_acquire_and_list() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
let lock_key = "repo/refs/heads/main/LOCK#.lock";
let protected = "repo/refs/heads/main/PROTECTED#";
store.insert(&bundle, Bytes::from_static(b"b"));
store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
store.insert(protected, Bytes::from_static(b""));
let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message,
r#""ref is protected. Run git-remote-object-store unprotect <url> <branch> to remove protection before deleting."?"#,
);
}
PushOutcome::Ok { .. } => panic!("expected protection-refusal Error"),
}
assert!(store.contains(&bundle), "bundle must survive");
assert!(store.contains(protected), "marker must survive");
assert!(store.contains(lock_key), "held lock must survive");
}
#[tokio::test]
async fn delete_remote_ref_zip_mode_expects_two_keys() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
let zip = "repo/refs/heads/main/repo.zip";
store.insert(&bundle, Bytes::from_static(b"b"));
store.insert(zip, Bytes::from_static(b""));
let outcome = delete_remote_ref_under_lock(
&store,
Some("repo"),
&r,
true,
"repo/refs/heads/main/LOCK#.lock",
)
.await
.unwrap();
assert_eq!(
outcome,
PushOutcome::Ok {
remote_ref: "refs/heads/main".into()
}
);
assert!(!store.contains(&bundle));
assert!(!store.contains(zip));
}
#[test]
fn push_outcome_renders_ok_line() {
let line = PushOutcome::Ok {
remote_ref: "refs/heads/main".into(),
}
.to_protocol_line();
assert_eq!(line, "ok refs/heads/main\n");
}
#[test]
fn push_outcome_renders_error_line() {
let line = PushOutcome::Error {
remote_ref: "refs/heads/main".into(),
message: r#""bad"?"#.into(),
}
.to_protocol_line();
assert_eq!(line, "error refs/heads/main \"bad\"?\n");
}
#[test]
fn duplicate_bundle_errors_use_consistent_wire_format() {
let pre_lock_line = PushOutcome::Error {
remote_ref: "refs/heads/main".into(),
message:
r#""multiple bundles exist on server. Run git-remote-object-store doctor to fix."?"#
.to_owned(),
}
.to_protocol_line();
let under_lock_line = PushOutcome::Error {
remote_ref: "refs/heads/main".into(),
message: r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#.to_owned(),
}
.to_protocol_line();
assert_eq!(
pre_lock_line,
"error refs/heads/main \"multiple bundles exist on server. \
Run git-remote-object-store doctor to fix.\"?\n",
);
assert_eq!(
under_lock_line,
"error refs/heads/main \"multiple bundles exist for the same ref on server. \
Run git-remote-object-store doctor to fix.\"?\n",
);
assert!(pre_lock_line.ends_with("\"?\n"));
assert!(under_lock_line.ends_with("\"?\n"));
}
#[test]
fn lock_ttl_env_override_falls_back_for_unset_invalid_or_zero() {
let env = crate::test_util::EnvGuard::take(ENV_LOCK_TTL_SECONDS);
let default_ttl = Duration::seconds(i64::try_from(DEFAULT_LOCK_TTL_SECONDS).unwrap());
env.clear();
assert_eq!(lock_ttl_from_env(), default_ttl);
assert_eq!(
resolve_lock_ttl_seconds(None),
DEFAULT_LOCK_TTL_SECONDS,
"None must defer to env-or-default",
);
assert_eq!(
resolve_lock_ttl_seconds(Some(0)),
DEFAULT_LOCK_TTL_SECONDS,
"Some(0) must not defeat per-ref locking (issue #208)",
);
env.set_to("not-a-number");
assert_eq!(lock_ttl_from_env(), default_ttl);
env.set_to("0");
assert_eq!(lock_ttl_from_env(), default_ttl);
env.set_to("120");
assert_eq!(lock_ttl_from_env(), Duration::seconds(120));
assert_eq!(
resolve_lock_ttl_seconds(None),
120,
"None must honour env override",
);
assert_eq!(
resolve_lock_ttl_seconds(Some(0)),
120,
"Some(0) must honour env override",
);
}
#[test]
fn saturating_duration_seconds_caps_at_i64_max() {
assert_eq!(
saturating_duration_seconds(u64::MAX),
Duration::seconds(i64::MAX),
);
}
#[test]
fn saturating_duration_seconds_passes_normal_value() {
assert_eq!(saturating_duration_seconds(60), Duration::seconds(60));
}
#[test]
fn resolve_lock_ttl_some_positive_returns_unchanged() {
assert_eq!(resolve_lock_ttl_seconds(Some(1)), 1);
assert_eq!(resolve_lock_ttl_seconds(Some(120)), 120);
assert_eq!(resolve_lock_ttl_seconds(Some(u64::MAX)), u64::MAX);
}
async fn push_under_lock_with_bundle(
store: &MockStore,
prefix: Option<&str>,
engine: StorageEngine,
) -> PushOutcome {
let r = rn("refs/heads/main");
let temp_dir = tempfile::Builder::new()
.prefix("test_push_")
.tempdir()
.unwrap();
let bundle_path = temp_dir.path().join("bundle");
std::fs::write(&bundle_path, b"fake bundle").unwrap();
let state = PushReadyState {
remote_ref: r,
local_sha: Sha::from_hex(SHA).unwrap(),
pre_existing: None,
bundle_path,
zip_artifacts: None,
engine,
force: false,
pre_existing_was_ancestor: true,
local_spec: "refs/heads/main".to_owned(),
hidden_bundles: HashSet::new(),
_temp_dir: temp_dir,
};
perform_push_under_lock(store, prefix, BackendKind::S3, state)
.await
.unwrap()
}
#[tokio::test]
async fn perform_push_under_lock_writes_format_key_on_first_push() {
let store = MockStore::new();
let outcome =
push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
assert!(
matches!(outcome, PushOutcome::Ok { .. }),
"expected Ok outcome"
);
assert!(
store.contains("repo/FORMAT"),
"FORMAT key must be written on the first push",
);
let content = store.get_bytes("repo/FORMAT").await.unwrap();
assert_eq!(content.as_ref(), b"bundle");
}
#[tokio::test]
async fn perform_push_under_lock_writes_format_key_without_prefix() {
let store = MockStore::new();
let outcome = push_under_lock_with_bundle(&store, None, StorageEngine::Bundle).await;
assert!(
matches!(outcome, PushOutcome::Ok { .. }),
"expected Ok outcome"
);
assert!(
store.contains("FORMAT"),
"FORMAT key must be written at root when no prefix",
);
let content = store.get_bytes("FORMAT").await.unwrap();
assert_eq!(content.as_ref(), b"bundle");
}
#[tokio::test]
async fn perform_push_under_lock_format_key_is_idempotent() {
let store = MockStore::new();
store.insert("repo/FORMAT", Bytes::from_static(b"bundle\n"));
let outcome =
push_under_lock_with_bundle(&store, Some("repo"), StorageEngine::Bundle).await;
assert!(
matches!(outcome, PushOutcome::Ok { .. }),
"expected Ok outcome"
);
let content = store.get_bytes("repo/FORMAT").await.unwrap();
assert_eq!(content.as_ref(), b"bundle\n");
}
fn push_state_with_pre_existing(pre_existing: Option<String>) -> PushReadyState {
let r = rn("refs/heads/main");
let temp_dir = tempfile::Builder::new()
.prefix("test_push_")
.tempdir()
.unwrap();
let bundle_path = temp_dir.path().join("bundle");
std::fs::write(&bundle_path, b"fake bundle").unwrap();
PushReadyState {
remote_ref: r,
local_sha: Sha::from_hex(SHA).unwrap(),
pre_existing,
bundle_path,
zip_artifacts: None,
engine: StorageEngine::Bundle,
force: false,
pre_existing_was_ancestor: true,
local_spec: "refs/heads/main".to_owned(),
hidden_bundles: HashSet::new(),
_temp_dir: temp_dir,
}
}
#[tokio::test]
async fn perform_push_under_lock_rejects_none_to_some_stale_remote() {
let store = MockStore::new();
let existing_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&existing_key, Bytes::from_static(b"old bundle"));
let state = push_state_with_pre_existing(None);
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""stale remote. Please fetch and retry."?"#
),
"expected stale-remote error, got {outcome:?}",
);
}
#[tokio::test]
async fn perform_push_under_lock_rejects_some_to_none_stale_remote() {
let store = MockStore::new();
let old_key = format!("repo/refs/heads/main/{SHA}.bundle");
let state = push_state_with_pre_existing(Some(old_key.clone()));
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""stale remote. Please fetch and retry."?"#
),
"expected stale-remote error, got {outcome:?}",
);
}
#[tokio::test]
async fn perform_push_under_lock_rejects_replaced_bundle_stale_remote() {
let store = MockStore::new();
let old_sha = SHA;
let new_sha = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let old_key = format!("repo/refs/heads/main/{old_sha}.bundle");
let new_key = format!("repo/refs/heads/main/{new_sha}.bundle");
store.insert(&new_key, Bytes::from_static(b"new bundle"));
let state = push_state_with_pre_existing(Some(old_key));
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""stale remote. Please fetch and retry."?"#
),
"expected stale-remote error, got {outcome:?}",
);
}
#[tokio::test]
async fn perform_push_under_lock_rejects_two_bundles_seen_under_lock() {
let store = MockStore::new();
let sha_a = "1111111111111111111111111111111111111111";
let sha_b = "2222222222222222222222222222222222222222";
store.insert(
format!("repo/refs/heads/main/{sha_a}.bundle"),
Bytes::from_static(b"bundle_a"),
);
store.insert(
format!("repo/refs/heads/main/{sha_b}.bundle"),
Bytes::from_static(b"bundle_b"),
);
let state = push_state_with_pre_existing(None);
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""multiple bundles exist for the same ref on server. Run git-remote-object-store doctor to fix."?"#
),
"expected under-lock multi-bundle error, got {outcome:?}",
);
assert!(store.contains(&format!("repo/refs/heads/main/{sha_a}.bundle")));
assert!(store.contains(&format!("repo/refs/heads/main/{sha_b}.bundle")));
}
#[tokio::test]
async fn perform_push_under_lock_passes_through_when_pre_existing_matches_current() {
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
let state = push_state_with_pre_existing(Some(pre_key.clone()));
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"expected Ok(refs/heads/main), got {outcome:?}",
);
let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
let new_bytes = store
.get_bytes(&bundle_dest)
.await
.expect("new bundle must be uploaded at bundle_dest");
assert_eq!(
new_bytes.as_ref(),
b"fake bundle",
"new bundle must contain the local payload",
);
assert!(
store.contains(&pre_key),
"old bundle at pre_key must survive the push (deferred via tombstone)",
);
let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
let metas = store.list("repo/gc/").await.unwrap();
let tombstones: Vec<_> = metas
.iter()
.filter(|m| m.key.starts_with(&tomb_listing))
.collect();
assert_eq!(
tombstones.len(),
1,
"exactly one baseline tombstone must be written; got keys: {:?}",
tombstones.iter().map(|m| &m.key).collect::<Vec<_>>(),
);
let body = store.get_bytes(&tombstones[0].key).await.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
parsed["ref_name"].as_str(),
Some("refs/heads/main"),
"tombstone must name the pushed ref",
);
assert_eq!(
parsed["sha"].as_str(),
Some(OTHER_SHA),
"tombstone must name the prior SHA so `gc sweep` reclaims OTHER_SHA.bundle",
);
assert!(
store.contains("repo/FORMAT"),
"FORMAT key must be written by the push",
);
assert!(
store.contains("repo/HEAD"),
"HEAD key must be written by the push",
);
}
#[tokio::test]
async fn perform_push_under_lock_succeeds_when_prior_cleanup_fails() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
store.arm(Fault::NetworkOnPutBytesPrefix {
prefix: baseline_tombstone_listing_prefix(Some("repo")),
});
store.arm(Fault::NetworkOnDelete {
key: pre_key.clone(),
});
let state = push_state_with_pre_existing(Some(pre_key.clone()));
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.expect("push must succeed even when prior-bundle cleanup fails");
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"expected Ok(refs/heads/main), got {outcome:?}",
);
let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
assert!(
store.contains(&bundle_dest),
"new bundle must be uploaded at bundle_dest",
);
assert_eq!(store.pending_faults(), 0);
assert!(
store.contains(&pre_key),
"cleanup faults must leave the prior bundle in place",
);
let metas = store.list("repo/gc/").await.unwrap();
let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
assert!(
!metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
"no baseline tombstone must remain after a failed PUT",
);
}
#[tokio::test]
async fn perform_push_under_lock_falls_back_to_sync_delete_on_tombstone_put_failure() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
store.arm(Fault::NetworkOnPutBytesPrefix {
prefix: baseline_tombstone_listing_prefix(Some("repo")),
});
let state = push_state_with_pre_existing(Some(pre_key.clone()));
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.expect("push must succeed when tombstone PUT fails but fallback delete succeeds");
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"expected Ok(refs/heads/main), got {outcome:?}",
);
assert_eq!(store.pending_faults(), 0);
let metas = store.list("repo/gc/").await.unwrap();
let tomb_listing = baseline_tombstone_listing_prefix(Some("repo"));
assert!(
!metas.iter().any(|m| m.key.starts_with(&tomb_listing)),
"tombstone PUT failed, so no baseline-tomb key may exist",
);
assert!(
!store.contains(&pre_key),
"fallback synchronous delete must reclaim the prior bundle",
);
let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
assert!(
store.contains(&bundle_dest),
"new bundle must be uploaded at bundle_dest",
);
}
#[tokio::test]
async fn perform_push_under_lock_succeeds_when_zip_upload_fails() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let r = rn("refs/heads/main");
let temp_dir = tempfile::Builder::new()
.prefix("test_push_")
.tempdir()
.unwrap();
let bundle_path = temp_dir.path().join("bundle");
std::fs::write(&bundle_path, b"fake bundle").unwrap();
let archive_tempdir = tempfile::Builder::new()
.prefix("test_zip_")
.tempdir()
.unwrap();
let archive_path = archive_tempdir.path().join("repo.zip");
std::fs::write(&archive_path, b"fake zip body").unwrap();
let state = PushReadyState {
remote_ref: r,
local_sha: Sha::from_hex(SHA).unwrap(),
pre_existing: None,
bundle_path,
zip_artifacts: Some(ZipArtifacts {
archive_path,
short_sha: "deadbeef".to_owned(),
commit_msg: "test commit".to_owned(),
_tempdir: archive_tempdir,
}),
engine: StorageEngine::Bundle,
force: false,
pre_existing_was_ancestor: true,
local_spec: "refs/heads/main".to_owned(),
hidden_bundles: HashSet::new(),
_temp_dir: temp_dir,
};
let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
store.arm(Fault::NetworkOnPutPath {
key: zip_dest.clone(),
});
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.expect("push must succeed even when zip upload fails");
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"expected Ok(refs/heads/main), got {outcome:?}",
);
let bundle_dest = format!("repo/refs/heads/main/{SHA}.bundle");
assert!(
store.contains(&bundle_dest),
"new bundle must be uploaded at bundle_dest",
);
assert_eq!(store.pending_faults(), 0);
assert!(
!store.contains(&zip_dest),
"zip key must be absent when the upload fault fires",
);
}
#[tokio::test]
async fn perform_push_under_lock_emits_codepipeline_metadata_on_s3() {
let (store, zip_dest) = run_zip_push(BackendKind::S3).await;
let meta = store.metadata(&zip_dest).expect("zip stored");
let summary = meta
.user_metadata
.iter()
.find(|(k, _)| k == "codepipeline-artifact-revision-summary")
.expect("S3 push must attach the CodePipeline revision-summary metadata");
assert_eq!(summary.1, "test commit");
}
#[tokio::test]
async fn perform_push_under_lock_omits_codepipeline_metadata_on_azure() {
let (store, zip_dest) = run_zip_push(BackendKind::Azure).await;
let meta = store.metadata(&zip_dest).expect("zip stored");
assert!(
meta.user_metadata.is_empty(),
"Azure push must not attach hyphenated CodePipeline metadata; \
got {entries:?}",
entries = meta.user_metadata,
);
}
async fn run_zip_push(kind: BackendKind) -> (MockStore, String) {
let store = MockStore::new();
let r = rn("refs/heads/main");
let temp_dir = tempfile::Builder::new()
.prefix("test_push_")
.tempdir()
.unwrap();
let bundle_path = temp_dir.path().join("bundle");
std::fs::write(&bundle_path, b"fake bundle").unwrap();
let archive_tempdir = tempfile::Builder::new()
.prefix("test_zip_")
.tempdir()
.unwrap();
let archive_path = archive_tempdir.path().join("repo.zip");
std::fs::write(&archive_path, b"fake zip body").unwrap();
let state = PushReadyState {
remote_ref: r,
local_sha: Sha::from_hex(SHA).unwrap(),
pre_existing: None,
bundle_path,
zip_artifacts: Some(ZipArtifacts {
archive_path,
short_sha: "deadbeef".to_owned(),
commit_msg: "test commit".to_owned(),
_tempdir: archive_tempdir,
}),
engine: StorageEngine::Bundle,
force: false,
pre_existing_was_ancestor: true,
local_spec: "refs/heads/main".to_owned(),
hidden_bundles: HashSet::new(),
_temp_dir: temp_dir,
};
let outcome = perform_push_under_lock(&store, Some("repo"), kind, state)
.await
.expect("push must succeed");
assert!(matches!(outcome, PushOutcome::Ok { .. }));
let zip_dest = "repo/refs/heads/main/repo.zip".to_owned();
assert!(
store.contains(&zip_dest),
"zip artifact must land on bucket"
);
(store, zip_dest)
}
#[tokio::test]
async fn delete_remote_ref_under_lock_reports_not_found_when_only_lock_present() {
let store = MockStore::new();
let lock_key = "repo/refs/heads/main/LOCK#.lock";
store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
let r = rn("refs/heads/main");
let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
.await
.unwrap();
match outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(message, r#""not found"?"#);
}
PushOutcome::Ok { .. } => panic!("expected Error, got Ok"),
}
assert!(
store.contains(lock_key),
"delete_remote_ref_under_lock must NOT delete the held lock key",
);
}
#[tokio::test]
async fn delete_remote_ref_under_lock_sweeps_concurrently_landed_bundle() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let lock_key = "repo/refs/heads/main/LOCK#.lock";
let bundle = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&bundle, Bytes::from_static(b"new"));
store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
.await
.unwrap();
assert_eq!(
outcome,
PushOutcome::Ok {
remote_ref: "refs/heads/main".into()
}
);
assert!(
!store.contains(&bundle),
"concurrently-landed bundle must be swept by the under-lock listing",
);
assert!(
store.contains(lock_key),
"held lock must survive the sweep (release_lock removes it)",
);
}
#[tokio::test]
async fn acquire_lock_stale_retry_loses_second_race() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let now = OffsetDateTime::now_utc();
let stale = now - Duration::seconds(120);
store.insert_with("k", Bytes::new(), stale, PutOpts::default());
store.arm(Fault::ContendedPutIfAbsent { key: "k".into() });
let arc = Arc::new(store);
let guard = acquire_lock(
Arc::clone(&arc) as Arc<dyn ObjectStore>,
"k",
Duration::seconds(60),
now,
)
.await
.unwrap();
assert!(guard.is_none(), "expected contention on the retry race");
assert_eq!(arc.pending_faults(), 0);
assert!(!arc.contains("k"));
}
#[test]
fn full_error_chain_deduplicates_inlined_source_text() {
let inner: crate::object_store::BoxError = Box::new(std::io::Error::other("dns failure"));
let err = PushError::Store(ObjectStoreError::Network(inner));
let rendered = full_error_chain(&err);
assert_eq!(
rendered, "object-store error during push: network error: dns failure",
"PushError::Store(Network(_)) must not duplicate the inner source",
);
}
#[derive(Default)]
struct RecordingPutPathStore {
inner: MockStore,
put_path_progress_seen: std::sync::Mutex<Vec<(String, bool)>>,
}
impl RecordingPutPathStore {
fn observed(&self) -> Vec<(String, bool)> {
self.put_path_progress_seen
.lock()
.expect("observation lock")
.clone()
}
}
#[async_trait::async_trait]
impl ObjectStore for RecordingPutPathStore {
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
self.inner.list(prefix).await
}
async fn get_to_file(
&self,
key: &str,
dest: &Path,
opts: crate::object_store::GetOpts,
) -> Result<(), ObjectStoreError> {
self.inner.get_to_file(key, dest, opts).await
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes(key).await
}
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError> {
self.inner.get_bytes_range(key, range).await
}
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
self.inner.put_bytes(key, body, opts).await
}
async fn put_path(
&self,
key: &str,
src: &Path,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
self.put_path_progress_seen
.lock()
.expect("observation lock")
.push((key.to_owned(), opts.progress.is_some()));
self.inner.put_path(key, src, opts).await
}
async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
self.inner.put_if_absent(key, body).await
}
async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
self.inner.head(key).await
}
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
self.inner.copy(src, dst).await
}
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
self.inner.delete(key).await
}
}
#[tokio::test]
async fn perform_push_under_lock_attaches_progress_sink_to_bundle_put_path() {
let store = RecordingPutPathStore::default();
let r = rn("refs/heads/main");
let temp_dir = tempfile::Builder::new()
.prefix("test_push_progress_")
.tempdir()
.unwrap();
let bundle_path = temp_dir.path().join("bundle");
std::fs::write(&bundle_path, b"fake bundle").unwrap();
let state = PushReadyState {
remote_ref: r,
local_sha: Sha::from_hex(SHA).unwrap(),
pre_existing: None,
bundle_path,
zip_artifacts: None,
engine: StorageEngine::Bundle,
force: false,
pre_existing_was_ancestor: true,
local_spec: "refs/heads/main".to_owned(),
hidden_bundles: HashSet::new(),
_temp_dir: temp_dir,
};
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(outcome, PushOutcome::Ok { .. }),
"expected Ok outcome",
);
let observed = store.observed();
assert!(
!observed.is_empty(),
"perform_push_under_lock must call put_path for the bundle",
);
for (key, has_sink) in &observed {
assert!(
has_sink,
"put_path for `{key}` must carry a ProgressSink (issue #55)",
);
}
}
#[test]
fn bundle_progress_sink_accepts_reports_without_panicking() {
let with_total = bundle_progress_sink("repo/bundle.bundle", Some(1_024));
with_total.report(256);
with_total.report(256);
with_total.report(512);
let without_total = bundle_progress_sink("repo/bundle.bundle", None);
without_total.report(1);
without_total.report(u64::MAX); }
#[test]
fn not_ancestor_token_value_is_stable() {
assert_eq!(
NOT_ANCESTOR_TOKEN, "not ancestor",
"spec/{{integration,live}}/*/force_push_spec.sh asserts on this exact substring",
);
let formatted = format!(r#""remote ref is {NOT_ANCESTOR_TOKEN} of refs/heads/main."?"#);
assert!(
formatted.contains(NOT_ANCESTOR_TOKEN),
"the not-ancestor PushOutcome::Error message must embed the token literally; got {formatted:?}",
);
}
#[test]
fn sanitize_metadata_value_strips_control_chars() {
assert_eq!(
sanitize_metadata_value("hello\r\nX-Injected: yes"),
"hello X-Injected: yes",
);
assert_eq!(sanitize_metadata_value("nul\0byte"), "nul byte");
assert_eq!(sanitize_metadata_value("plain text"), "plain text");
assert_eq!(
sanitize_metadata_value("café — short summary"),
"café — short summary",
"non-ASCII printable characters must pass through unchanged",
);
assert_eq!(sanitize_metadata_value(""), "");
}
#[tokio::test]
async fn perform_push_under_lock_rejects_force_when_protected_under_lock_and_not_ff() {
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
state.force = true;
state.pre_existing_was_ancestor = false;
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""remote ref is not ancestor of refs/heads/main."?"#
),
"expected under-lock NotAncestor refusal, got {outcome:?}",
);
assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
let local_sha = SHA;
assert!(store.contains(&pre_key));
assert!(
!store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
"refused push must not upload the new bundle",
);
}
#[tokio::test]
async fn perform_push_under_lock_allows_force_when_protected_under_lock_but_ff() {
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
state.force = true;
state.pre_existing_was_ancestor = true; let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"FF push must pass even when protected, got {outcome:?}",
);
assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
}
#[tokio::test]
async fn perform_push_under_lock_allows_force_when_not_ancestor_and_not_protected() {
let store = MockStore::new();
let pre_key = format!("repo/refs/heads/main/{OTHER_SHA}.bundle");
store.insert(&pre_key, Bytes::from_static(b"old bundle"));
let mut state = push_state_with_pre_existing(Some(pre_key.clone()));
state.force = true;
state.pre_existing_was_ancestor = false; let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"legitimate non-FF force-push must proceed, got {outcome:?}",
);
let local_sha = SHA;
assert!(
store.contains(&format!("repo/refs/heads/main/{local_sha}.bundle")),
"new bundle must be uploaded on a successful force-push",
);
if let PushOutcome::Error { message, .. } = &outcome {
assert!(
!message.contains("not ancestor"),
"force-push without protection must not emit NotAncestor: {message}",
);
}
}
#[tokio::test]
async fn perform_push_under_lock_skips_protection_check_for_non_force() {
let store = MockStore::new();
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let mut state = push_state_with_pre_existing(None);
state.force = false;
state.pre_existing_was_ancestor = false;
let outcome = perform_push_under_lock(&store, Some("repo"), BackendKind::S3, state)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == "refs/heads/main"),
"non-force push must pass regardless of protection: {outcome:?}",
);
}
#[tokio::test]
async fn issue_151_clean_delete_passes_post_sweep_verification() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let bundle = format!("repo/refs/heads/main/{SHA}.bundle");
let lock_key = "repo/refs/heads/main/LOCK#.lock";
store.insert(&bundle, Bytes::from_static(b"b"));
store.insert(lock_key, Bytes::from_static(b"held-lock-payload"));
let outcome = delete_remote_ref_under_lock(&store, Some("repo"), &r, false, lock_key)
.await
.unwrap();
assert_eq!(
outcome,
PushOutcome::Ok {
remote_ref: "refs/heads/main".into()
},
"clean delete must report ok after the post-sweep probe",
);
assert!(!store.contains(&bundle), "bundle must be swept");
assert!(
store.contains(lock_key),
"lock survives the sweep (release removes it)",
);
}
#[tokio::test]
async fn verify_no_orphan_protected_after_delete_is_noop_when_marker_absent() {
use crate::object_store::mock::Fault;
let store = MockStore::new();
let r = rn("refs/heads/main");
store.arm(Fault::NetworkOnHead {
key: "repo/refs/heads/main/PROTECTED#".to_owned(),
});
verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
assert_eq!(
store.pending_faults(),
0,
"helper must call head() on the marker key — fault unconsumed",
);
assert!(
!store.contains("repo/refs/heads/main/PROTECTED#"),
"helper must not touch the bucket",
);
}
#[tokio::test]
async fn verify_no_orphan_protected_after_delete_does_not_mutate_when_marker_present() {
let store = MockStore::new();
let r = rn("refs/heads/main");
let marker = "repo/refs/heads/main/PROTECTED#";
store.insert(marker, Bytes::new());
verify_no_orphan_protected_after_delete(&store, Some("repo"), &r).await;
assert!(
store.contains(marker),
"helper must not delete the orphan marker — surveillance only",
);
}
}