use std::sync::atomic::Ordering;
use axum::{
http::{header, HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use crate::{
auth, can_delete, http_semantics as hs, not_found, server_error, storage_error, store,
unauthorized, world, AuditAppendJob, AuthGate, BlockingSqliteError, Core, ErrorReason, Phase,
TraceCtx,
};
pub(crate) async fn execute_delete(
headers: HeaderMap,
tier: auth::Tier,
world: String,
core: &Core,
trace: &TraceCtx,
) -> Phase {
if !can_delete(tier) {
return Phase::Error {
resp: unauthorized("delete requires token; system worlds need approve token"),
reason: ErrorReason::Auth(AuthGate::Delete),
};
}
if world == "var/log/deletes" {
return Phase::Error {
resp: unauthorized("delete ledger is append-only"),
reason: ErrorReason::Auth(AuthGate::Delete),
};
}
let _write_guard = core.acquire_world_lock(&world).await;
trace.emit_aux_kv("lock_acquired", &format!("target={world}"));
if let Err(resp) = hs::check_write_preconditions(core, &world, &headers) {
let reason = if resp.status() == StatusCode::PRECONDITION_FAILED {
ErrorReason::PreconditionFailed
} else {
ErrorReason::StorageRead
};
return Phase::Error { resp, reason };
}
let Some(stage) = (match core.read_world(&world) {
Ok(current) => current,
Err(e) => {
return Phase::Error {
resp: storage_error("storage read", e),
reason: ErrorReason::StorageRead,
};
}
}) else {
return Phase::Error {
resp: not_found(),
reason: ErrorReason::NotFound,
};
};
let body_sha256_before = world::sha256_hex(&stage.body);
let delete_meta = hs::request_meta_headers(
&headers,
&core.persist_header_allowlist,
&core.persist_header_user_deny,
);
let delete_content_type = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
if let Err(e) = core
.append_to_ledger(AuditAppendJob {
ledger_world: "var/log/deletes",
event_type: "delete_intent",
target: world.clone(),
body_sha256: body_sha256_before.clone(),
size: 0,
content_type: delete_content_type.clone(),
headers: delete_meta.clone(),
key: core.hmac_key.clone(),
})
.await
{
return Phase::Error {
resp: blocking_storage_error("delete audit intent", e),
reason: ErrorReason::StorageWriteAudit,
};
}
trace.emit_aux("audit_intent");
let was_first = !core.delete_ledger_created.swap(true, Ordering::AcqRel);
if was_first {
core.durable_world_count.fetch_add(1, Ordering::Relaxed);
}
core.install_tombstone(&world).await;
trace.emit_aux("read_cache_drained");
let ok = core.delete_world_blocking(&world).await;
core.clear_tombstone(&world);
if !ok {
return Phase::Error {
resp: server_error("delete failed after audit intent".to_string()),
reason: ErrorReason::StorageWriteAudit,
};
}
trace.emit_aux("physical_deleted");
if store::is_persistent(&world) {
core.storage_body_bytes
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
Some(used.saturating_sub(stage.body.len()))
})
.ok();
core.durable_world_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
Some(count.saturating_sub(1))
})
.ok();
}
trace.emit_aux("counter_decremented");
core.notify("DELETE", &world, "");
trace.emit_aux("notify_sent");
if let Err(commit_err) = core
.append_to_ledger(AuditAppendJob {
ledger_world: "var/log/deletes",
event_type: "delete_commit",
target: world.clone(),
body_sha256: body_sha256_before.clone(),
size: 0,
content_type: delete_content_type.clone(),
headers: delete_meta.clone(),
key: core.hmac_key.clone(),
})
.await
{
eprintln!(" WARNING: delete_commit audit append failed for {world}: {commit_err:?}");
trace.emit_aux_kv("audit_commit_failed", &format!("err={commit_err:?}"));
match core
.append_to_ledger(AuditAppendJob {
ledger_world: "var/log/deletes",
event_type: "delete_commit_failed",
target: world.clone(),
body_sha256: body_sha256_before,
size: 0,
content_type: delete_content_type,
headers: delete_meta,
key: core.hmac_key.clone(),
})
.await
{
Ok(_) => {
trace.emit_aux("audit_commit_failed_event_logged");
}
Err(failed_event_err) => {
eprintln!(
" WARNING: delete_commit_failed audit append also failed for {world}: {failed_event_err:?}"
);
trace.emit_aux_kv(
"audit_commit_failed_event_failed",
&format!("err={failed_event_err:?}"),
);
}
}
} else {
trace.emit_aux("audit_commit");
}
Phase::CommittedWrite((StatusCode::NO_CONTENT, "").into_response())
}
fn blocking_storage_error(scope: &str, err: BlockingSqliteError) -> Response {
match err {
BlockingSqliteError::Sqlite(err) => storage_error(scope, err),
BlockingSqliteError::Worker => server_error(format!("{scope} worker failed")),
}
}