use std::sync::Arc;
use anodizer_core::retry::{RetryPolicy, jitter_duration};
use anyhow::{Context as _, Result};
use super::secondary_rate_limit::{RetryAfterCapture, secondary_rl_delay_with_env};
use super::spec::{AlreadyExistsAction, classify_already_exists, upload_retry_locals};
use super::upload_outcome::{UploadAttemptOutcome, classify_upload_attempt};
use super::{
check_github_rate_limit_with_env, delete_release_asset_by_name, find_release_asset_probe,
format_retry_warn,
};
use crate::release_log;
pub(crate) const MIN_UPLOAD_TRANSIENT_ATTEMPTS: u32 = 3;
pub(crate) struct UploadAssetRequest<'a> {
pub octo: &'a Arc<octocrab::Octocrab>,
pub owner: &'a str,
pub repo: &'a str,
pub release_id: u64,
pub tag: &'a str,
pub path: &'a std::path::Path,
pub file_name: &'a str,
pub replace_existing_artifacts: bool,
pub policy: &'a RetryPolicy,
pub retry_after: Option<&'a RetryAfterCapture>,
pub token: &'a str,
pub env_source: &'a dyn anodizer_core::EnvSource,
}
pub(crate) async fn upload_release_asset(req: UploadAssetRequest<'_>) -> Result<()> {
let UploadAssetRequest {
octo,
owner,
repo,
release_id,
tag,
path,
file_name,
replace_existing_artifacts,
policy,
retry_after,
token,
env_source,
} = req;
let (configured_attempts, initial_retry_delay, max_retry_delay) = upload_retry_locals(policy);
let max_upload_attempts = std::cmp::max(configured_attempts, MIN_UPLOAD_TRANSIENT_ATTEMPTS);
let mut last_err: Option<anyhow::Error> = None;
let mut overwrite_attempted = false;
for attempt in 1..=max_upload_attempts {
let data = std::fs::read(path)
.with_context(|| format!("release: read artifact {}", path.display()))?;
let local_size = data.len() as u64;
let result = octo
.repos(owner, repo)
.releases()
.upload_asset(release_id, file_name, data.into())
.send()
.await;
let outcome = classify_upload_attempt(&result);
match outcome {
UploadAttemptOutcome::Success => {
last_err = None;
break;
}
UploadAttemptOutcome::AlreadyExists => {
let err = result.expect_err("AlreadyExists outcome guarantees Err variant");
let probe = find_release_asset_probe(
octo,
owner,
repo,
release_id,
file_name,
policy,
retry_after,
)
.await
.with_context(|| {
format!(
"release: look up existing asset '{}' on release '{}'",
file_name, tag
)
})?;
let partial = probe.is_some_and(|p| !p.uploaded);
if overwrite_attempted && !partial {
release_log().warn(&format!(
"skipped re-upload — existing asset '{file_name}' on release '{tag}' \
reappeared after delete+retry; stale asset kept"
));
last_err = None;
break;
}
match classify_already_exists(replace_existing_artifacts, probe, local_size) {
AlreadyExistsAction::SkipIdempotent => {
last_err = None;
break;
}
AlreadyExistsAction::BailReplaceForbidden => {
return Err(anyhow::anyhow!(err)).with_context(|| {
format!(
"release: artifact '{}' already exists on release '{}' \
with different bytes and `replace_existing_artifacts: false` \
forbids overwriting (set \
`release.replace_existing_artifacts: true` \
to permit overwrites)",
file_name, tag
)
});
}
AlreadyExistsAction::DeleteAndRetry => {
}
}
match delete_release_asset_by_name(
octo,
owner,
repo,
release_id,
file_name,
policy,
retry_after,
)
.await
{
Ok(_) => {
if !partial {
overwrite_attempted = true;
}
last_err = Some(anyhow::anyhow!(err));
if attempt < max_upload_attempts {
let base = std::cmp::min(
initial_retry_delay * 2u32.pow(attempt - 1),
max_retry_delay,
);
tokio::time::sleep(jitter_duration(base)).await;
}
continue;
}
Err(del_err) if partial => {
return Err(del_err).with_context(|| {
format!(
"release: delete partial asset '{}' on release '{}' \
left by an interrupted upload",
file_name, tag
)
});
}
Err(del_err) => {
release_log().warn(&format!(
"skipped overwrite of existing asset '{file_name}' on release '{tag}' \
— size mismatch and delete failed: {del_err}; stale asset kept"
));
last_err = None;
break;
}
}
}
UploadAttemptOutcome::SecondaryRateLimited => {
let err = result.expect_err("SecondaryRateLimited outcome guarantees Err variant");
let delay = jitter_duration(secondary_rl_delay_with_env(retry_after, env_source));
release_log().warn(&format!(
"upload of '{file_name}' hit GitHub secondary \
rate limit; sleeping {:.1}s before retry \
(attempt {attempt}/{})",
delay.as_secs_f64(),
max_upload_attempts,
));
if attempt < max_upload_attempts {
tokio::time::sleep(delay).await;
}
last_err = Some(anyhow::anyhow!(err));
continue;
}
UploadAttemptOutcome::PrimaryRateLimited => {
let err = result.expect_err("PrimaryRateLimited outcome guarantees Err variant");
release_log().status(&format!(
"rate limited on upload of '{file_name}', checking rate limits..."
));
check_github_rate_limit_with_env(&reqwest::Client::new(), token, 100, env_source)
.await;
last_err = Some(anyhow::anyhow!(err));
continue;
}
UploadAttemptOutcome::NotFound => {
let err = result.expect_err("NotFound outcome guarantees Err variant");
let label = format!("upload of '{file_name}'");
release_log().warn(&format_retry_warn(
&label,
attempt,
max_upload_attempts,
404,
));
last_err = Some(anyhow::anyhow!(err));
if attempt < max_upload_attempts {
let base =
std::cmp::min(initial_retry_delay * 2u32.pow(attempt - 1), max_retry_delay);
tokio::time::sleep(jitter_duration(base)).await;
}
continue;
}
UploadAttemptOutcome::TransientRetry => {
let err = result.expect_err("TransientRetry outcome guarantees Err variant");
let status = match &err {
octocrab::Error::GitHub { source, .. } => source.status_code.as_u16(),
_ => 0,
};
let label = format!("upload of '{file_name}'");
release_log().warn(&format_retry_warn(
&label,
attempt,
max_upload_attempts,
status,
));
last_err = Some(anyhow::anyhow!(err));
if attempt < max_upload_attempts {
let base =
std::cmp::min(initial_retry_delay * 2u32.pow(attempt - 1), max_retry_delay);
tokio::time::sleep(jitter_duration(base)).await;
}
continue;
}
UploadAttemptOutcome::Fatal => {
let err = result.expect_err("Fatal outcome guarantees Err variant");
return Err(anyhow::anyhow!(err)).with_context(|| {
format!(
"release: upload artifact '{}' to release '{}'",
file_name, tag
)
});
}
}
}
if let Some(err) = last_err {
return Err(err).with_context(|| {
format!(
"release: upload artifact '{}' to release '{}' failed after {} attempts",
file_name, tag, max_upload_attempts
)
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::{build_test_octocrab, test_retry_policy};
use anodizer_core::test_helpers::responder::spawn_oneshot_http_responder_with;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
struct EmptyEnv;
impl anodizer_core::EnvSource for EmptyEnv {
fn var(&self, _key: &str) -> Option<String> {
None
}
}
fn http(status_line: &str, body: &str) -> String {
format!(
"HTTP/1.1 {status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{body}",
body.len()
)
}
fn release_json(addr: SocketAddr) -> String {
let body = format!(
r#"{{
"url": "http://{addr}/repos/o/r/releases/7",
"html_url": "http://{addr}/o/r/releases/tag/v1",
"assets_url": "http://{addr}/repos/o/r/releases/7/assets",
"upload_url": "http://{addr}/repos/o/r/releases/7/assets{{?name,label}}",
"id": 7,
"node_id": "RE_kwDO",
"tag_name": "v1",
"target_commitish": "master",
"name": "v1",
"body": null,
"draft": false,
"prerelease": false,
"created_at": "2026-01-01T00:00:00Z",
"published_at": null,
"author": null,
"assets": []
}}"#
);
http("200 OK", &body)
}
fn asset_json(addr: SocketAddr, name: &str, size: u64, state: &str) -> String {
format!(
r#"{{
"url": "http://{addr}/repos/o/r/releases/assets/99",
"browser_download_url": "http://{addr}/o/r/releases/download/v1/{name}",
"id": 99,
"node_id": "RA_kwDO",
"name": "{name}",
"label": null,
"state": "{state}",
"content_type": "application/octet-stream",
"size": {size},
"download_count": 0,
"created_at": "2026-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
"uploader": null
}}"#
)
}
fn uploaded_asset_response(addr: SocketAddr, name: &str, size: u64) -> String {
http("201 Created", &asset_json(addr, name, size, "uploaded"))
}
const BODY_401: &str =
r#"{"message":"Bad credentials","documentation_url":"https://docs.github.com/rest"}"#;
const BODY_404: &str = r#"{"message":"Not Found"}"#;
const BODY_422_ALREADY_EXISTS: &str = r#"{"message":"Validation Failed","errors":[{"resource":"ReleaseAsset","code":"already_exists","field":"name"}]}"#;
fn write_artifact(dir: &std::path::Path, name: &str, bytes: &[u8]) -> std::path::PathBuf {
let p = dir.join(name);
std::fs::write(&p, bytes).expect("write artifact fixture");
p
}
async fn run_upload(
addr: SocketAddr,
path: &std::path::Path,
replace_existing_artifacts: bool,
) -> Result<()> {
run_upload_with_env(addr, path, replace_existing_artifacts, &EmptyEnv).await
}
async fn run_upload_with_env(
addr: SocketAddr,
path: &std::path::Path,
replace_existing_artifacts: bool,
env_source: &dyn anodizer_core::EnvSource,
) -> Result<()> {
let octo = build_test_octocrab(addr);
let policy = test_retry_policy();
upload_release_asset(UploadAssetRequest {
octo: &octo,
owner: "o",
repo: "r",
release_id: 7,
tag: "v1",
path,
file_name: "app.tar.gz",
replace_existing_artifacts,
policy: &policy,
retry_after: None,
token: "test-token",
env_source,
})
.await
}
#[tokio::test]
async fn transient_401_then_success_retries_and_succeeds() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = write_artifact(tmp.path(), "app.tar.gz", b"bytes");
let (addr, calls) = spawn_oneshot_http_responder_with(|addr| {
vec![
release_json(addr),
http("401 Unauthorized", BODY_401),
release_json(addr),
uploaded_asset_response(addr, "app.tar.gz", 5),
]
});
let result = run_upload(addr, &path, false).await;
assert!(
result.is_ok(),
"401 must retry to success: {:?}",
result.err()
);
assert_eq!(
calls.load(Ordering::SeqCst),
4,
"expected 2 attempts x (GET release + POST upload)"
);
}
#[tokio::test]
async fn partial_asset_422_deletes_then_reuploads() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = write_artifact(tmp.path(), "app.tar.gz", b"full-content");
let partial_list = |addr| {
http(
"200 OK",
&format!("[{}]", asset_json(addr, "app.tar.gz", 4, "starter")),
)
};
let (addr, calls) = spawn_oneshot_http_responder_with(|addr| {
vec![
release_json(addr),
http("401 Unauthorized", BODY_401),
release_json(addr),
http("422 Unprocessable Entity", BODY_422_ALREADY_EXISTS),
partial_list(addr),
partial_list(addr),
http("204 No Content", ""),
release_json(addr),
uploaded_asset_response(addr, "app.tar.gz", 12),
]
});
let result = run_upload(addr, &path, false).await;
assert!(
result.is_ok(),
"partial-asset 422 must delete + re-upload to success: {:?}",
result.err()
);
assert_eq!(
calls.load(Ordering::SeqCst),
9,
"expected 3 attempts (2 GET+POST pairs + 1 GET+POST) + probe list + delete list + DELETE"
);
}
#[tokio::test]
async fn published_asset_422_still_bails_when_replace_forbidden() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = write_artifact(tmp.path(), "app.tar.gz", b"different-content");
let (addr, calls) = spawn_oneshot_http_responder_with(|addr| {
vec![
release_json(addr),
http("422 Unprocessable Entity", BODY_422_ALREADY_EXISTS),
http(
"200 OK",
&format!("[{}]", asset_json(addr, "app.tar.gz", 4, "uploaded")),
),
]
});
let result = run_upload(addr, &path, false).await;
let err = result.expect_err("differing published asset + replace=false must bail");
assert!(
format!("{err:#}").contains("replace_existing_artifacts"),
"error must point at the replace_existing_artifacts knob: {err:#}"
);
assert_eq!(
calls.load(Ordering::SeqCst),
3,
"bail must not delete or re-upload (1 GET + 1 POST + 1 probe list)"
);
}
#[tokio::test]
async fn secondary_rate_limit_exhaustion_surfaces_actionable_error() {
let body_403 = r#"{"message":"You have exceeded a secondary rate limit and have been temporarily blocked from content creation. Please retry your request again later.","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#secondary-rate-limits"}"#;
let tmp = tempfile::tempdir().expect("tempdir");
let path = write_artifact(tmp.path(), "app.tar.gz", b"bytes");
let (addr, calls) = spawn_oneshot_http_responder_with(|addr| {
let mut v = Vec::new();
for _ in 0..5 {
v.push(release_json(addr));
v.push(http("403 Forbidden", body_403));
}
v
});
let env =
anodizer_core::MapEnvSource::new().with("ANODIZER_GITHUB_SECONDARY_RL_DELAY_SECS", "1");
let result = run_upload_with_env(addr, &path, false, &env).await;
let err = result.expect_err("persistent secondary-RL 403 must fail after bounded attempts");
let rendered = format!("{err:#}");
assert!(
rendered.contains("failed after 5 attempts"),
"error must report the exhausted attempt budget: {rendered}"
);
assert!(
rendered.to_lowercase().contains("secondary rate limit"),
"error must name the secondary rate limit (actionable, not a bare 403): {rendered}"
);
assert_eq!(
calls.load(Ordering::SeqCst),
10,
"expected 5 attempts x (GET release + POST 403)"
);
}
#[tokio::test]
async fn permanent_404_fails_after_bounded_attempts() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = write_artifact(tmp.path(), "app.tar.gz", b"bytes");
let (addr, calls) =
spawn_oneshot_http_responder_with(|_addr| vec![http("404 Not Found", BODY_404); 5]);
let result = run_upload(addr, &path, false).await;
let err = result.expect_err("persistent 404 must fail after bounded attempts");
assert!(
format!("{err:#}").contains("failed after 5 attempts"),
"error must report the exhausted attempt budget: {err:#}"
);
assert_eq!(
calls.load(Ordering::SeqCst),
5,
"each attempt's GET /releases/{{id}} 404s before any POST"
);
}
}