use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bytes::Bytes;
use time::OffsetDateTime;
use tracing::{debug, info, warn};
use crate::git::{self, PeeledTip, RefName, Sha};
use crate::keys;
use crate::object_store::{ObjectStore, ObjectStoreError, PutOpts};
use crate::protocol::push::{
self as bundle_push, DELETE_PROTECTION_MESSAGE, LockGuard, PushError, PushOutcome, PushSpec,
acquire_lock, bundle_progress_sink, delete_idempotent, head_key, is_protected, lock_key,
lock_ttl_from_env, not_ancestor_wire_message, parse_push_args, ref_listing_prefix,
verify_no_orphan_protected_after_delete,
};
use crate::url::StorageEngine;
use super::PackchainError;
use super::gc::{try_write_baseline_tombstone, write_baseline_tombstone_best_effort};
use super::keys::{chain_key, pack_idx_key, pack_key, path_index_key};
use super::manifest::{load_chain, next_manifest, write_chain, write_path_index};
use super::pack::{BuiltPack, build_baseline_pack, build_incremental_pack};
use super::schema::{ChainManifest, ChainSegment, Sha40};
struct PushConfig {
engine: StorageEngine,
ttl: time::Duration,
}
enum PrepareOutcome {
Ready(Box<ReadyState>),
Done(PushOutcome),
}
struct ReadyState {
remote_ref: RefName,
local_sha: Sha,
local_sha40: Sha40,
cwd: PathBuf,
prior: Option<ChainManifest>,
pack_content_sha: Sha40,
pack_bytes: u64,
force: bool,
prior_was_ancestor: bool,
local_spec: String,
_temp_dir: tempfile::TempDir,
}
#[derive(Debug, Clone, Copy)]
enum GitProbeError {
LocalRefNotFound,
NotAncestor,
Shallow,
}
struct LocalGit {
local_sha: Sha,
peeled: PeeledTip,
prior_commit: Option<Sha>,
cwd: PathBuf,
prior_was_ancestor: bool,
}
pub(crate) async fn push_batch(
ctx: &super::super::protocol::BatchCtx,
engine: StorageEngine,
cmds: Vec<String>,
) -> Result<Vec<PushOutcome>, PushError> {
if cmds.is_empty() {
return Ok(Vec::new());
}
debug!(count = cmds.len(), engine = %engine, "processing packchain push batch");
let config = PushConfig {
engine,
ttl: lock_ttl_from_env(),
};
let mut outcomes = Vec::with_capacity(cmds.len());
for cmd in cmds {
let spec = parse_push_args(&cmd)?;
let remote_ref_str = spec.remote_ref.as_str().to_owned();
let outcome = match push_one(
Arc::clone(&ctx.store),
ctx.prefix.as_deref(),
ctx.repo_dir.as_path(),
&config,
OffsetDateTime::now_utc(),
spec,
)
.await
{
Ok(o) => o,
Err(e)
if matches!(
e,
PushError::Store(_)
| PushError::Git(_)
| PushError::Io(_)
| PushError::Sha(_)
| PushError::Packchain(_)
) =>
{
let chain = full_error_chain(&e);
warn!(ref_name = %remote_ref_str, error = %chain, "packchain push ref failed");
PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(r#""{chain}"?"#),
}
}
Err(e) => return Err(e),
};
outcomes.push(outcome);
}
Ok(outcomes)
}
fn full_error_chain(err: &PushError) -> String {
let mut msg = err.to_string();
crate::protocol::append_source_chain(&mut msg, err);
msg
}
async fn push_one(
store: Arc<dyn ObjectStore>,
prefix: Option<&str>,
repo_dir: &Path,
config: &PushConfig,
now: OffsetDateTime,
spec: PushSpec,
) -> Result<PushOutcome, PushError> {
let state = match prepare_push(Arc::clone(&store), prefix, repo_dir, config, now, spec).await? {
PrepareOutcome::Done(o) => return Ok(o),
PrepareOutcome::Ready(s) => s,
};
let remote_ref_str = state.remote_ref.as_str().to_owned();
let lock = lock_key(prefix, &state.remote_ref);
let Some(guard) = acquire_lock(Arc::clone(&store), &lock, config.ttl, now).await? else {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to acquire ref lock at {lock}. Another client may be pushing. If this persists beyond {}s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
config.ttl.whole_seconds(),
),
});
};
let result = perform_push_under_lock(store.as_ref(), prefix, config.engine, *state).await;
let release_result = bundle_push::release_lock(guard).await;
match (&result, release_result) {
(Ok(PushOutcome::Ok { .. }), Err(e)) => {
warn!(key = %lock, error = %e, "packchain failed to release lock");
Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
),
})
}
(_, Err(e)) => {
warn!(key = %lock, error = %e, "packchain lock release failed (push already errored)");
result
}
_ => result,
}
}
async fn prepare_push(
store: Arc<dyn ObjectStore>,
prefix: Option<&str>,
repo_dir: &Path,
config: &PushConfig,
now: OffsetDateTime,
spec: PushSpec,
) -> Result<PrepareOutcome, PushError> {
let PushSpec {
force,
local_spec,
remote_ref,
} = spec;
let remote_ref_str = remote_ref.as_str().to_owned();
if local_spec.is_empty() {
let outcome = delete_remote_ref_packchain(store, prefix, &remote_ref, config, now).await?;
return Ok(PrepareOutcome::Done(outcome));
}
let store_ref = store.as_ref();
let force_push = force;
debug!(local = %local_spec, remote = %remote_ref, force_push, "packchain push");
let prior = load_chain(store_ref, prefix, &remote_ref)
.await
.map_err(PushError::Packchain)?;
let prior_tip_sha: Option<Sha> = match prior.as_ref() {
Some(c) => Some(Sha::from_hex(c.tip.as_str()).map_err(PushError::Sha)?),
None => None,
};
let probe = local_git_work_packchain(repo_dir, &local_spec, prior_tip_sha, force_push)?;
let local = match probe {
Ok(local) => local,
Err(probe_err) => {
return Ok(PrepareOutcome::Done(probe_error_to_outcome(
probe_err,
remote_ref_str,
&local_spec,
)));
}
};
let local_sha40 =
Sha40::from_oid(local.local_sha.as_object_id()).map_err(PushError::Packchain)?;
if !force_push && prior.as_ref().map(|c| &c.tip) == Some(&local_sha40) {
info!(
ref_name = %remote_ref,
tip = %local_sha40.as_str(),
"packchain push: same tip already on bucket, no-op",
);
return Ok(PrepareOutcome::Done(PushOutcome::Ok {
remote_ref: remote_ref_str,
}));
}
let temp_dir = tempfile::Builder::new()
.prefix("git_remote_object_store_packchain_")
.tempdir()?;
let local_sha = local.local_sha;
let kind = match (force_push, local.prior_commit) {
(true, _) | (false, None) => PackKind::Baseline,
(false, Some(prior_commit)) => PackKind::Incremental { prior_commit },
};
let (pack, baseline_bundle) = build_pack_and_baseline(
local.cwd.clone(),
temp_dir.path().to_owned(),
local_sha,
local.peeled,
kind,
local_spec.clone(),
)
.await?;
upload_pack_idx_baseline(
store_ref,
prefix,
&remote_ref,
local_sha,
&pack,
baseline_bundle.as_deref(),
)
.await?;
Ok(PrepareOutcome::Ready(Box::new(ReadyState {
remote_ref,
local_sha,
local_sha40,
cwd: local.cwd,
prior,
pack_content_sha: pack.content_sha,
pack_bytes: pack.pack_bytes,
force: force_push,
prior_was_ancestor: local.prior_was_ancestor,
local_spec,
_temp_dir: temp_dir,
})))
}
fn probe_error_to_outcome(
err: GitProbeError,
remote_ref_str: String,
local_spec: &str,
) -> PushOutcome {
let message = match err {
GitProbeError::LocalRefNotFound => format!(r#""{local_spec} not found"?"#),
GitProbeError::NotAncestor => not_ancestor_wire_message(local_spec),
GitProbeError::Shallow => {
r#""cannot push from a shallow clone: rev-walk crosses a shallow boundary"?"#.to_owned()
}
};
PushOutcome::Error {
remote_ref: remote_ref_str,
message,
}
}
#[derive(Debug, Clone, Copy)]
enum PackKind {
Baseline,
Incremental { prior_commit: Sha },
}
async fn build_pack_and_baseline(
cwd: PathBuf,
temp_path: PathBuf,
local_sha: Sha,
peeled: PeeledTip,
kind: PackKind,
local_spec: String,
) -> Result<(BuiltPack, Option<PathBuf>), PushError> {
let result = tokio::task::spawn_blocking(move || {
let (pack, needs_baseline) = match kind {
PackKind::Baseline => (build_baseline_pack(&cwd, peeled, &temp_path)?, true),
PackKind::Incremental { prior_commit } => {
let PeeledTip::Commit {
commit: local_commit,
tag_chain,
} = peeled
else {
return Err(PackchainError::PackBuild(
"incremental pack requires commit-tipped peel; non-commit peel reached \
build_pack_and_baseline — push dispatch is buggy"
.to_owned(),
));
};
(
build_incremental_pack(
&cwd,
prior_commit,
local_commit,
&tag_chain,
&temp_path,
)?,
false,
)
}
};
let baseline = if needs_baseline {
let bundle_path = crate::bundle::create(&cwd, &temp_path, local_sha, &local_spec)
.map_err(|e| PackchainError::PackBuild(format!("baseline bundle: {e}")))?;
Some(bundle_path)
} else {
None
};
Ok::<_, PackchainError>((pack, baseline))
})
.await
.map_err(|join_err| std::io::Error::other(join_err.to_string()))?;
result.map_err(PushError::Packchain)
}
fn local_git_work_packchain(
repo_dir: &Path,
local_spec: &str,
prior_tip: Option<Sha>,
force_push: bool,
) -> Result<Result<LocalGit, GitProbeError>, PushError> {
let repo = gix::open(repo_dir).map_err(|e| PushError::Git(crate::git::GitError::from(e)))?;
let cwd = repo.workdir().unwrap_or_else(|| repo.git_dir()).to_owned();
let Ok(local_sha) = git::branch::resolve(&repo, local_spec) else {
return Ok(Err(GitProbeError::LocalRefNotFound));
};
let peeled = git::peel_tag_chain(&repo, local_sha).map_err(PushError::Git)?;
let local_commit = match &peeled {
PeeledTip::Commit { commit, .. } => Some(*commit),
PeeledTip::Tree { .. } | PeeledTip::Blob { .. } => None,
};
let (prior_commit, prior_was_ancestor) = match prior_tip {
None => (None, true),
Some(prior) => match (local_commit, git::peel_tag_chain(&repo, prior)) {
(Some(local_commit_oid), Ok(PeeledTip::Commit { commit, .. })) => {
let ancestor =
git::is_ancestor(&repo, commit, local_commit_oid).map_err(PushError::Git)?;
if !force_push && !ancestor {
return Ok(Err(GitProbeError::NotAncestor));
}
let prior_commit = (!force_push && ancestor).then_some(commit);
(prior_commit, ancestor)
}
(None, _)
| (
_,
Ok(PeeledTip::Tree { .. } | PeeledTip::Blob { .. })
| Err(crate::git::GitError::FindObject(_)),
) => {
if !force_push {
return Ok(Err(GitProbeError::NotAncestor));
}
(None, false)
}
(_, Err(e)) => return Err(PushError::Git(e)),
},
};
if let Some(local_commit_oid) = local_commit
&& rev_walk_crosses_shallow_boundary(&repo, local_commit_oid)
.map_err(PushError::Packchain)?
{
return Ok(Err(GitProbeError::Shallow));
}
drop(repo);
Ok(Ok(LocalGit {
local_sha,
peeled,
prior_commit,
cwd,
prior_was_ancestor,
}))
}
fn rev_walk_crosses_shallow_boundary(
repo: &gix::Repository,
tip: Sha,
) -> Result<bool, PackchainError> {
let Some(commits) = repo
.shallow_commits()
.map_err(|e| PackchainError::PackBuild(format!("read .git/shallow: {e}")))?
else {
return Ok(false);
};
let boundary: HashSet<gix_hash::ObjectId> = commits.iter().copied().collect();
let walker = repo
.rev_walk([*tip.as_object_id()])
.all()
.map_err(|e| PackchainError::PackBuild(format!("rev-walk for shallow check: {e}")))?;
for info in walker {
let info = info.map_err(|e| PackchainError::PackBuild(format!("rev-walk step: {e}")))?;
if boundary.contains(&info.id) {
return Ok(true);
}
}
Ok(false)
}
async fn perform_push_under_lock(
store: &dyn ObjectStore,
prefix: Option<&str>,
engine: StorageEngine,
state: ReadyState,
) -> Result<PushOutcome, PushError> {
let ReadyState {
remote_ref,
local_sha,
local_sha40,
cwd,
prior,
pack_content_sha,
pack_bytes,
force,
prior_was_ancestor,
local_spec,
_temp_dir,
} = state;
let remote_ref_str = remote_ref.as_str().to_owned();
if force && !prior_was_ancestor && is_protected(store, prefix, &remote_ref).await? {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: not_ancestor_wire_message(&local_spec),
});
}
let current = load_chain(store, prefix, &remote_ref)
.await
.map_err(PushError::Packchain)?;
if !force {
let pre_tip = prior.as_ref().map(|c| &c.tip);
let cur_tip = current.as_ref().map(|c| &c.tip);
if pre_tip != cur_tip {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: r#""stale chain. Please fetch and retry."?"#.to_owned(),
});
}
}
let path_index = tokio::task::spawn_blocking(move || -> Result<_, PackchainError> {
let repo = gix::open(&cwd).map_err(crate::git::GitError::from)?;
let peeled = git::peel_tag_chain(&repo, local_sha).map_err(PackchainError::Git)?;
super::git::extract_path_index(&repo, &peeled, local_sha)
})
.await
.map_err(|join_err| std::io::Error::other(join_err.to_string()))?
.map_err(PushError::Packchain)?;
let format_key = keys::join(prefix, "FORMAT");
store
.put_if_absent(&format_key, Bytes::from_static(engine.as_str().as_bytes()))
.await?;
let head = head_key(prefix);
store
.put_if_absent(
&head,
Bytes::copy_from_slice(remote_ref.as_str().as_bytes()),
)
.await?;
let new_segment = ChainSegment {
sha: local_sha40.clone(),
parent_sha: None, pack: pack_key(None, &pack_content_sha),
bytes: pack_bytes,
};
let manifest = next_manifest(prior.as_ref(), &local_sha40, new_segment, force);
write_chain(store, prefix, &remote_ref, &manifest)
.await
.map_err(PushError::Packchain)?;
match path_index.as_ref() {
Some(index) => write_path_index(store, prefix, &remote_ref, index)
.await
.map_err(PushError::Packchain)?,
None => delete_idempotent(store, &path_index_key(prefix, &remote_ref)).await?,
}
if force {
force_push_baseline_cleanup(store, prefix, &remote_ref, prior.as_ref(), &local_sha40).await;
}
Ok(PushOutcome::Ok {
remote_ref: remote_ref_str,
})
}
async fn upload_pack_idx_baseline(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
local_sha: Sha,
pack: &BuiltPack,
baseline_bundle: Option<&Path>,
) -> Result<(), PushError> {
let pack_dest = pack_key(prefix, &pack.content_sha);
upload_with_progress(store, &pack_dest, &pack.pack_path, Some(pack.pack_bytes)).await?;
let idx_dest = pack_idx_key(prefix, &pack.content_sha);
upload_with_progress(
store,
&idx_dest,
&pack.idx_path,
file_len(&pack.idx_path).await,
)
.await?;
if let Some(bundle_path) = baseline_bundle {
let bundle_dest = keys::bundle_key(prefix, remote_ref, local_sha);
upload_with_progress(
store,
&bundle_dest,
bundle_path,
file_len(bundle_path).await,
)
.await?;
}
Ok(())
}
async fn file_len(path: &Path) -> Option<u64> {
tokio::fs::metadata(path).await.map(|m| m.len()).ok()
}
async fn upload_with_progress(
store: &dyn ObjectStore,
dest_key: &str,
src: &Path,
total_hint: Option<u64>,
) -> Result<(), PushError> {
let opts = PutOpts {
progress: Some(bundle_progress_sink(dest_key, total_hint)),
..PutOpts::default()
};
store.put_path(dest_key, src, opts).await?;
Ok(())
}
async fn force_push_baseline_cleanup(
store: &dyn ObjectStore,
prefix: Option<&str>,
remote_ref: &RefName,
prior: Option<&ChainManifest>,
local_sha40: &Sha40,
) {
let Some(prior) = prior else {
return;
};
write_baseline_tombstone_best_effort(
store,
prefix,
remote_ref,
&prior.full_at,
local_sha40,
"force-push",
)
.await;
}
async fn release_lock_or_warn(guard: LockGuard, lock: &str, after: &str) {
if let Err(e) = bundle_push::release_lock(guard).await {
warn!(
key = %lock,
error = %e,
after,
"packchain delete failed to release lock",
);
}
}
async fn delete_remote_ref_packchain(
store: Arc<dyn ObjectStore>,
prefix: Option<&str>,
remote_ref: &RefName,
config: &PushConfig,
now: OffsetDateTime,
) -> Result<PushOutcome, PushError> {
let chain = chain_key(prefix, remote_ref);
let remote_ref_str = remote_ref.as_str().to_owned();
let lock = lock_key(prefix, remote_ref);
let Some(guard) = acquire_lock(Arc::clone(&store), &lock, config.ttl, now).await? else {
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to acquire ref lock at {lock}. Another client may be pushing or deleting. If this persists beyond {}s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
config.ttl.whole_seconds(),
),
});
};
match store.head(&chain).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound(_)) => {
release_lock_or_warn(guard, &lock, "not-found probe").await;
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: r#""not found"?"#.to_owned(),
});
}
Err(e) => {
release_lock_or_warn(guard, &lock, "chain.json probe error").await;
return Err(PushError::Store(e));
}
}
let listing = ref_listing_prefix(prefix, remote_ref);
let store_ref = store.as_ref();
let entries = match store_ref.list(&listing).await {
Ok(es) => es,
Err(e) => {
release_lock_or_warn(guard, &lock, "list failure").await;
return Err(PushError::Store(e));
}
};
if keys::entries_have_protected_marker(&entries) {
release_lock_or_warn(guard, &lock, "protection rejection").await;
return Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: DELETE_PROTECTION_MESSAGE.to_owned(),
});
}
let deferred_bundle_key =
try_write_baseline_tombstone(store_ref, prefix, remote_ref, &entries, "packchain delete")
.await;
if let Some(ref key) = deferred_bundle_key {
info!(
remote_ref = %remote_ref.as_str(),
key = %key,
"packchain delete: deferred baseline bundle delete via tombstone",
);
}
let sweep_result: Result<(), PushError> = async {
for entry in &entries {
if entry.key == lock {
continue;
}
if deferred_bundle_key.as_deref() == Some(entry.key.as_str()) {
continue;
}
delete_idempotent(store_ref, &entry.key).await?;
}
Ok(())
}
.await;
if sweep_result.is_ok() {
verify_no_orphan_protected_after_delete(store_ref, prefix, remote_ref).await;
}
let release_result = bundle_push::release_lock(guard).await;
match (sweep_result, release_result) {
(Ok(()), Ok(())) => Ok(PushOutcome::Ok {
remote_ref: remote_ref_str,
}),
(Ok(()), Err(e)) => {
warn!(key = %lock, error = %e, "packchain delete failed to release lock");
Ok(PushOutcome::Error {
remote_ref: remote_ref_str,
message: format!(
r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
),
})
}
(Err(sweep_err), Err(rel_err)) => {
warn!(key = %lock, error = %rel_err, "packchain delete lock release failed (sweep already errored)");
Err(sweep_err)
}
(Err(sweep_err), Ok(())) => Err(sweep_err),
}
}
#[cfg(test)]
mod tests {
use super::super::keys::path_index_key;
use super::*;
use crate::object_store::mock::MockStore;
use crate::packchain::gc::baseline_tombstone_listing_prefix;
fn rn(s: &str) -> RefName {
RefName::new(s).unwrap()
}
fn delete_test_config() -> PushConfig {
PushConfig {
engine: StorageEngine::Packchain,
ttl: time::Duration::seconds(60),
}
}
#[tokio::test]
async fn delete_returns_not_found_when_chain_absent() {
let store = Arc::new(MockStore::new());
let remote = rn("refs/heads/main");
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
None,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
match &outcome {
PushOutcome::Error { message, .. } => {
assert_eq!(
message, r#""not found"?"#,
"wire bytes for not-found delete"
);
}
PushOutcome::Ok { .. } => panic!("expected Error, got {outcome:?}"),
}
assert!(
!store.contains(&lock_key(None, &remote)),
"lock key must NOT linger after a not-found delete",
);
}
#[tokio::test]
async fn delete_under_lock_completes_when_chain_present() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
store.insert(&chain, Bytes::from_static(b"{}"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
assert!(!store.contains(&chain));
assert!(!store.contains(&lock_key(prefix, &remote)));
}
#[tokio::test]
async fn delete_sweeps_chain_path_index_and_defers_baseline() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let baseline_key = keys::bundle_key(prefix, &remote, baseline_sha);
store.insert(
chain_key(prefix, &remote),
Bytes::from_static(b"{\"v\":1,\"tip\":\"0000000000000000000000000000000000000001\",\"full_at\":\"0000000000000000000000000000000000000001\",\"segments\":[]}"),
);
store.insert(path_index_key(prefix, &remote), Bytes::from_static(b"{}"));
store.insert(&baseline_key, Bytes::from_static(b"PACK"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
assert!(!store.contains(&chain_key(prefix, &remote)));
assert!(!store.contains(&path_index_key(prefix, &remote)));
assert!(
store.contains(&baseline_key),
"baseline bundle at {baseline_key} must survive synchronous delete (deferred via tombstone)",
);
let tomb_keys: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
.collect();
assert_eq!(
tomb_keys.len(),
1,
"exactly one baseline tombstone must exist: {tomb_keys:?}",
);
let body = store
.get_bytes(&tomb_keys[0])
.await
.expect("tombstone body present");
let parsed: serde_json::Value =
serde_json::from_slice(&body).expect("tombstone is valid JSON");
assert_eq!(parsed["v"], 1);
assert_eq!(parsed["sha"], baseline_sha.to_string());
assert_eq!(parsed["ref_name"], "refs/heads/main");
assert!(
!store.contains(&lock_key(prefix, &remote)),
"lock key must be released after a successful delete",
);
}
#[tokio::test]
async fn delete_tombstone_is_reaped_by_gc_sweep() {
use crate::packchain::gc;
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let baseline_key = keys::bundle_key(prefix, &remote, baseline_sha);
store.insert(
chain_key(prefix, &remote),
Bytes::from_static(b"{\"v\":1,\"tip\":\"0000000000000000000000000000000000000001\",\"full_at\":\"0000000000000000000000000000000000000001\",\"segments\":[]}"),
);
store.insert(path_index_key(prefix, &remote), Bytes::from_static(b"{}"));
store.insert(&baseline_key, Bytes::from_static(b"PACK"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
assert!(
store.contains(&baseline_key),
"pre-condition: bundle still present after delete (deferred)",
);
let tomb_keys_pre: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
.collect();
assert_eq!(
tomb_keys_pre.len(),
1,
"exactly one tombstone must exist pre-sweep: {tomb_keys_pre:?}",
);
let body = store
.get_bytes(&tomb_keys_pre[0])
.await
.expect("tombstone body present");
let parsed: serde_json::Value =
serde_json::from_slice(&body).expect("tombstone is valid JSON");
assert_eq!(parsed["v"], 1);
assert_eq!(parsed["sha"], baseline_sha.to_string());
assert_eq!(parsed["ref_name"], "refs/heads/main");
let store_ref: &dyn ObjectStore = store.as_ref();
let sweep = gc::sweep(
store_ref,
"repo",
gc::SweepOpts {
grace_hours: 0,
force: true,
},
)
.await
.expect("sweep");
assert_eq!(
sweep.swept_tombstones, 1,
"sweep must reclaim exactly the tombstone helper-protocol delete wrote",
);
assert!(
!store.contains(&baseline_key),
"baseline bundle must be deleted by sweep: surviving keys = {:?}",
store.keys(),
);
let surviving_tombs: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
.collect();
assert!(
surviving_tombs.is_empty(),
"tombstone must be deleted by sweep: {surviving_tombs:?}",
);
}
#[tokio::test]
async fn delete_leaves_baseline_bundle_for_concurrent_fetch() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let baseline_key = keys::bundle_key(prefix, &remote, baseline_sha);
store.insert(
chain_key(prefix, &remote),
Bytes::from_static(b"{\"v\":1,\"tip\":\"0000000000000000000000000000000000000001\",\"full_at\":\"0000000000000000000000000000000000000001\",\"segments\":[]}"),
);
store.insert(path_index_key(prefix, &remote), Bytes::from_static(b"{}"));
store.insert(&baseline_key, Bytes::from_static(b"PACKBUNDLE"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
let bytes = store
.as_ref()
.get_bytes(&baseline_key)
.await
.expect("concurrent fetcher must still read the bundle within the grace window");
assert_eq!(bytes.as_ref(), b"PACKBUNDLE");
}
#[tokio::test]
async fn delete_with_unparseable_chain_falls_back_to_synchronous_bundle_delete() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000002").unwrap();
let baseline_key = keys::bundle_key(prefix, &remote, baseline_sha);
store.insert(chain_key(prefix, &remote), Bytes::from_static(b"{}"));
store.insert(&baseline_key, Bytes::from_static(b"PACK"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
assert!(
!store.contains(&baseline_key),
"bundle must be swept synchronously when chain.json is unparseable",
);
let tomb_keys: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
.collect();
assert!(
tomb_keys.is_empty(),
"no tombstone should be written on the fall-back path: {tomb_keys:?}",
);
}
#[tokio::test]
async fn delete_with_mismatched_full_at_falls_back_to_synchronous_bundle_delete() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let seeded_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let seeded_bundle_key = keys::bundle_key(prefix, &remote, seeded_sha);
store.insert(
chain_key(prefix, &remote),
Bytes::from_static(b"{\"v\":1,\"tip\":\"0000000000000000000000000000000000000002\",\"full_at\":\"0000000000000000000000000000000000000002\",\"segments\":[]}"),
);
store.insert(path_index_key(prefix, &remote), Bytes::from_static(b"{}"));
store.insert(&seeded_bundle_key, Bytes::from_static(b"PACK"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(matches!(outcome, PushOutcome::Ok { .. }));
assert!(
!store.contains(&seeded_bundle_key),
"seeded bundle must be swept synchronously when full_at points elsewhere",
);
assert!(!store.contains(&chain_key(prefix, &remote)));
assert!(!store.contains(&path_index_key(prefix, &remote)));
let tomb_keys: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(&baseline_tombstone_listing_prefix(Some("repo"))))
.collect();
assert!(
tomb_keys.is_empty(),
"no tombstone should be written when full_at has no matching listing entry: {tomb_keys:?}",
);
assert!(
!store.contains(&lock_key(prefix, &remote)),
"lock key must be released after the fall-back delete",
);
}
#[tokio::test]
async fn delete_with_lock_held_reports_contention_and_preserves_keys() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let path_index = path_index_key(prefix, &remote);
let lock = lock_key(prefix, &remote);
store.insert(&chain, Bytes::from_static(b"{}"));
store.insert(&path_index, Bytes::from_static(b"{}"));
store.insert(&lock, Bytes::new());
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
let expected = format!(
r#""failed to acquire ref lock at {lock}. Another client may be pushing or deleting. If this persists beyond 60s, run git-remote-object-store doctor to inspect and optionally clear stale locks."?"#,
);
match &outcome {
PushOutcome::Error {
message,
remote_ref,
} => {
assert_eq!(message, &expected, "contention wire message must be exact");
assert_eq!(remote_ref, remote.as_str());
}
PushOutcome::Ok { .. } => panic!("expected contention Error, got {outcome:?}"),
}
assert!(store.contains(&chain), "chain.json must NOT be deleted");
assert!(
store.contains(&path_index),
"path-index.json must NOT be deleted",
);
assert!(
store.contains(&lock),
"foreign-held LOCK#.lock must NOT be deleted by a contending delete (#116)",
);
}
#[tokio::test]
async fn delete_sweep_excludes_lock_key() {
use crate::object_store::mock::Fault;
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let path_index = path_index_key(prefix, &remote);
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let baseline = keys::bundle_key(prefix, &remote, baseline_sha);
let lock = lock_key(prefix, &remote);
store.insert(&chain, Bytes::from_static(b"{}"));
store.insert(&path_index, Bytes::from_static(b"{}"));
store.insert(&baseline, Bytes::from_static(b"PACK"));
store.arm(Fault::NetworkOnDelete { key: lock.clone() });
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(!store.contains(&chain), "chain.json must be swept");
assert!(
!store.contains(&path_index),
"path-index.json must be swept",
);
assert!(!store.contains(&baseline), "baseline bundle must be swept");
assert!(
store.contains(&lock),
"lock must survive the sweep — only release_lock may delete it, \
and the armed fault blocked that delete",
);
assert_eq!(
store.pending_faults(),
0,
"armed delete-fault must have fired exactly once (via release)",
);
let expected = format!(
r#""failed to release lock. You may need to manually remove the lock {lock} from the server or use git-remote-object-store doctor to fix."?"#,
);
match &outcome {
PushOutcome::Error {
message,
remote_ref,
} => {
assert_eq!(message, &expected, "release-failure wire bytes");
assert_eq!(remote_ref, remote.as_str());
}
PushOutcome::Ok { .. } => panic!(
"expected release-failure Error (sweep correctly skipped the lock, \
release tripped the armed fault), got {outcome:?}",
),
}
}
#[tokio::test]
async fn delete_recovers_stale_lock_and_completes() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let path_index = path_index_key(prefix, &remote);
let lock = lock_key(prefix, &remote);
store.insert(&chain, Bytes::from_static(b"{}"));
store.insert(&path_index, Bytes::from_static(b"{}"));
let now = OffsetDateTime::now_utc();
let stale = now - time::Duration::seconds(120);
store.insert_with(&lock, Bytes::new(), stale, PutOpts::default());
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
now,
)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == remote.as_str()),
"stale lock must be recoverable end-to-end, got {outcome:?}",
);
assert!(!store.contains(&chain), "chain.json must be swept");
assert!(
!store.contains(&path_index),
"path-index.json must be swept",
);
assert!(
!store.contains(&lock),
"lock must be released after a successful stale-recovery delete",
);
}
#[tokio::test]
async fn delete_rejects_when_protected_marker_present_with_chain() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let protected = "repo/refs/heads/main/PROTECTED#";
store.insert(&chain, Bytes::from_static(b"{}"));
store.insert(protected, Bytes::new());
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
match &outcome {
PushOutcome::Error {
message,
remote_ref,
} => {
assert_eq!(
message, DELETE_PROTECTION_MESSAGE,
"wire bytes for protection rejection must match the bundle engine",
);
assert_eq!(remote_ref, remote.as_str());
}
PushOutcome::Ok { .. } => {
panic!("expected protection Error, got {outcome:?}")
}
}
assert!(
store.contains(&chain),
"chain.json must NOT be swept when PROTECTED# marker is present",
);
assert!(
store.contains(protected),
"PROTECTED# marker must NOT be swept by a refused delete",
);
assert!(
!store.contains(&lock_key(prefix, &remote)),
"lock must be released after a protection-rejected delete",
);
}
#[tokio::test]
async fn delete_rejects_when_protected_marker_lands_after_lock_acquire() {
let inner = MockStore::new();
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let protected = "repo/refs/heads/main/PROTECTED#";
inner.insert(&chain, Bytes::from_static(b"{}"));
let protected_key = protected.to_owned();
let store = Arc::new(PostHeadHookStore::new(inner, &chain, move |inner| {
inner.insert(&protected_key, Bytes::new());
}));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == DELETE_PROTECTION_MESSAGE
),
"TOCTOU-window protect must reject with the canonical message, got {outcome:?}",
);
assert!(
store.hook_fired(),
"post-head hook must have fired on the chain.json head probe",
);
let ref_prefix = "repo/refs/heads/main/";
let surviving: Vec<String> = store
.inner
.keys()
.into_iter()
.filter(|k| k.starts_with(ref_prefix))
.collect();
assert!(
surviving.iter().any(|k| k == &chain),
"chain.json must survive a TOCTOU-window rejection, surviving = {surviving:?}",
);
assert!(
surviving.iter().any(|k| k == protected),
"PROTECTED# marker must survive a TOCTOU-window rejection, surviving = {surviving:?}",
);
}
#[tokio::test]
async fn delete_unprotected_with_chain_sweeps_as_before() {
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let path_index = path_index_key(prefix, &remote);
let baseline_sha = Sha::from_hex("0000000000000000000000000000000000000001").unwrap();
let baseline = keys::bundle_key(prefix, &remote, baseline_sha);
store.insert(&chain, Bytes::from_static(b"{}"));
store.insert(&path_index, Bytes::from_static(b"{}"));
store.insert(&baseline, Bytes::from_static(b"PACK"));
let config = delete_test_config();
let outcome = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await
.unwrap();
assert!(
matches!(&outcome, PushOutcome::Ok { remote_ref } if remote_ref == remote.as_str()),
"unprotected delete must succeed, got {outcome:?}",
);
assert!(!store.contains(&chain), "chain.json must be swept");
assert!(
!store.contains(&path_index),
"path-index.json must be swept",
);
assert!(!store.contains(&baseline), "baseline bundle must be swept");
assert!(
!store.contains(&lock_key(prefix, &remote)),
"lock must be released after a successful unprotected delete",
);
}
#[tokio::test]
async fn delete_remote_ref_packchain_releases_lock_on_list_failure() {
use crate::object_store::mock::Fault;
let store = Arc::new(MockStore::new());
let prefix = Some("repo");
let remote = rn("refs/heads/main");
let chain = chain_key(prefix, &remote);
let listing = ref_listing_prefix(prefix, &remote);
let lock = lock_key(prefix, &remote);
store.insert(&chain, Bytes::from_static(b"{}"));
store.arm(Fault::AccessDeniedOnList { prefix: listing });
let config = delete_test_config();
let result = delete_remote_ref_packchain(
Arc::clone(&store) as Arc<dyn ObjectStore>,
prefix,
&remote,
&config,
OffsetDateTime::now_utc(),
)
.await;
match result {
Err(PushError::Store(_)) => {}
Err(other) => panic!("expected PushError::Store, got {other:?}"),
Ok(outcome) => panic!("expected list-failure error, got Ok({outcome:?})"),
}
assert_eq!(
store.pending_faults(),
0,
"armed list-fault must have fired exactly once on the under-lock listing",
);
assert!(
!store.contains(&lock),
"lock key must be released even when the under-lock list call fails",
);
assert!(
store.contains(&chain),
"chain.json must NOT be swept when the listing call failed before the sweep",
);
}
fn ready_state_for_protection_test(force: bool, prior_was_ancestor: bool) -> Box<ReadyState> {
let local_sha = Sha::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap();
let local_sha40 = Sha40::from_oid(local_sha.as_object_id()).unwrap();
let pack_content_sha = Sha40::try_new("1111111111111111111111111111111111111111").unwrap();
let temp_dir = tempfile::Builder::new()
.prefix("test_packchain_push_")
.tempdir()
.unwrap();
Box::new(ReadyState {
remote_ref: rn("refs/heads/main"),
local_sha,
local_sha40,
cwd: temp_dir.path().to_owned(),
prior: None,
pack_content_sha,
pack_bytes: 0,
force,
prior_was_ancestor,
local_spec: "refs/heads/main".to_owned(),
_temp_dir: temp_dir,
})
}
#[tokio::test]
async fn perform_push_under_lock_rejects_force_when_protected_under_lock_and_not_ff() {
let store = MockStore::new();
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let state = ready_state_for_protection_test(true, false);
let outcome =
perform_push_under_lock(&store, Some("repo"), StorageEngine::Packchain, *state)
.await
.unwrap();
assert!(
matches!(
&outcome,
PushOutcome::Error { message, .. }
if message == r#""remote ref is not ancestor of refs/heads/main."?"#
),
"expected under-lock NotAncestor refusal, got {outcome:?}",
);
assert!(store.contains("repo/refs/heads/main/PROTECTED#"));
#[allow(clippy::case_sensitive_file_extension_comparisons)]
let is_artefact = |k: &str| {
k.ends_with(".pack")
|| k.ends_with(".idx")
|| k.ends_with("/chain.json")
|| k.ends_with("/path-index.json")
};
let ref_prefix = "repo/refs/heads/main/";
let stray: Vec<String> = store
.keys()
.into_iter()
.filter(|k| k.starts_with(ref_prefix) && is_artefact(k))
.collect();
assert!(
stray.is_empty(),
"refused push must not upload packchain artefacts, found: {stray:?}",
);
}
#[tokio::test]
async fn perform_push_under_lock_allows_force_when_not_ancestor_and_not_protected() {
let store = MockStore::new();
let state = ready_state_for_protection_test(true, false);
let outcome =
perform_push_under_lock(&store, Some("repo"), StorageEngine::Packchain, *state).await;
match outcome {
Ok(PushOutcome::Ok { .. }) => {
}
Ok(PushOutcome::Error { message, .. }) => assert!(
!message.contains("not ancestor"),
"unprotected force-push must not emit NotAncestor: {message:?}",
),
Err(e) => assert!(
!e.to_string().contains("not ancestor"),
"unprotected force-push must not emit NotAncestor: {e}",
),
}
}
#[tokio::test]
async fn perform_push_under_lock_skips_protection_check_for_non_force() {
let store = MockStore::new();
store.insert("repo/refs/heads/main/PROTECTED#", Bytes::from_static(b""));
let state = ready_state_for_protection_test(false, false);
let outcome =
perform_push_under_lock(&store, Some("repo"), StorageEngine::Packchain, *state).await;
match outcome {
Ok(PushOutcome::Ok { .. }) => {
}
Ok(PushOutcome::Error { message, .. }) => assert!(
!message.contains("not ancestor"),
"non-force push must not hit the protection rejection: {message:?}",
),
Err(e) => assert!(
!e.to_string().contains("not ancestor"),
"non-force push must not hit the protection rejection: {e}",
),
}
}
type PostHeadHook = Box<dyn FnOnce(&MockStore) + Send>;
struct PostHeadHookStore {
inner: MockStore,
hook: std::sync::Mutex<Option<PostHeadHook>>,
trigger_key: String,
}
impl PostHeadHookStore {
fn new(
inner: MockStore,
trigger_key: impl Into<String>,
hook: impl FnOnce(&MockStore) + Send + 'static,
) -> Self {
Self {
inner,
hook: std::sync::Mutex::new(Some(Box::new(hook))),
trigger_key: trigger_key.into(),
}
}
fn hook_fired(&self) -> bool {
self.hook.lock().unwrap().is_none()
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for PostHeadHookStore {
forward: list, get_to_file, get_bytes, get_bytes_range,
put_bytes, put_path, put_if_absent,
copy, delete;
async fn head(
&self,
key: &str,
) -> Result<crate::object_store::ObjectMeta, ObjectStoreError> {
let result = self.inner.head(key).await;
if result.is_ok()
&& key == self.trigger_key
&& let Some(hook) = self.hook.lock().unwrap().take()
{
hook(&self.inner);
}
result
}
}
}
}