use std::future::Future;
use std::ops::ControlFlow;
use anodizer_core::retry::{RetryPolicy, is_retriable, retry_async};
use crate::release_log;
pub(crate) fn format_retry_warn(label: &str, attempt: u32, max: u32, status: u16) -> String {
format!("release: {label} failed (retriable, attempt {attempt}/{max}, status={status})")
}
pub(crate) async fn retry_octocrab_call<T, F, Fut>(
policy: &RetryPolicy,
label: &'static str,
mut make_call: F,
) -> Result<T, octocrab::Error>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, octocrab::Error>>,
{
let max = policy.max_attempts;
retry_async(policy, |attempt| {
let fut = make_call();
async move {
match fut.await {
Ok(v) => Ok(v),
Err(err) => {
let (status, retriable) = classify_retriability(&err);
if retriable {
release_log().warn(&format_retry_warn(label, attempt, max, status));
Err(ControlFlow::Continue(err))
} else {
Err(ControlFlow::Break(err))
}
}
}
}
})
.await
}
fn classify_retriability(err: &octocrab::Error) -> (u16, bool) {
use anodizer_core::retry::{HttpError, Retriable};
match err {
octocrab::Error::GitHub { source, .. } => {
let status = source.status_code.as_u16();
let probe = HttpError::new(std::io::Error::other("status probe"), status);
(status, is_retriable(&probe))
}
octocrab::Error::Hyper { .. }
| octocrab::Error::Http { .. }
| octocrab::Error::Service { .. }
| octocrab::Error::Other { .. }
| octocrab::Error::Serde { .. }
| octocrab::Error::Json { .. } => {
let probe = Retriable::new(std::io::Error::other("transport probe"));
(0, is_retriable(&probe))
}
_ => {
(0, false)
}
}
}
pub(crate) fn is_octocrab_404(err: &octocrab::Error) -> bool {
matches!(
err,
octocrab::Error::GitHub { source, .. } if source.status_code.as_u16() == 404
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
fn spawn_oneshot_http_responder(responses: Vec<&'static str>) -> (SocketAddr, Arc<AtomicU32>) {
let listener =
TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for retry-helper test");
let addr = listener
.local_addr()
.expect("local_addr on freshly bound listener");
let counter = Arc::new(AtomicU32::new(0));
let counter_inner = counter.clone();
std::thread::spawn(move || {
for (i, resp) in responses.iter().enumerate() {
let (mut stream, _) = match listener.accept() {
Ok(pair) => pair,
Err(_) => return,
};
counter_inner.fetch_add(1, Ordering::SeqCst);
let mut buf = [0u8; 8192];
let _ = stream.set_read_timeout(Some(Duration::from_millis(500)));
let _ = stream.read(&mut buf);
let _ = stream.write_all(resp.as_bytes());
let _ = stream.flush();
let _ = stream.shutdown(std::net::Shutdown::Both);
if i == responses.len() - 1 {
break;
}
}
});
(addr, counter)
}
fn build_test_octocrab(addr: SocketAddr) -> octocrab::Octocrab {
let builder = octocrab::OctocrabBuilder::new()
.base_uri(format!("http://{addr}/"))
.expect("OctocrabBuilder::base_uri accepts loopback URL");
builder
.build()
.expect("OctocrabBuilder::build succeeds on loopback URL")
}
#[tokio::test]
async fn retries_5xx_then_succeeds() {
let (addr, calls) = spawn_oneshot_http_responder(vec![
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n[]",
]);
let octo = build_test_octocrab(addr);
let policy = RetryPolicy {
max_attempts: 5,
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
};
let result: Result<Vec<serde_json::Value>, octocrab::Error> =
retry_octocrab_call(&policy, "test list", || async {
octo.get("/test", None::<&()>).await
})
.await;
assert!(
result.is_ok(),
"5xx must retry to success: {:?}",
result.err()
);
assert_eq!(
calls.load(Ordering::SeqCst),
3,
"expected 2 retries past 503 + 1 success"
);
}
#[tokio::test]
async fn fast_fails_4xx_without_retry() {
let (addr, calls) = spawn_oneshot_http_responder(vec![
"HTTP/1.1 404 Not Found\r\nContent-Type: application/json\r\nContent-Length: 27\r\n\r\n{\"message\":\"Not Found\"} ",
]);
let octo = build_test_octocrab(addr);
let policy = RetryPolicy {
max_attempts: 5,
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
};
let result: Result<Vec<serde_json::Value>, octocrab::Error> =
retry_octocrab_call(&policy, "test list", || async {
octo.get("/test", None::<&()>).await
})
.await;
assert!(result.is_err(), "4xx must surface as Err, got Ok");
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"4xx must NOT retry (fast-fail honors classifier)"
);
}
#[tokio::test]
async fn respects_max_attempts_one() {
let (addr, calls) = spawn_oneshot_http_responder(vec![
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
]);
let octo = build_test_octocrab(addr);
let policy = RetryPolicy {
max_attempts: 1,
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
};
let result: Result<Vec<serde_json::Value>, octocrab::Error> =
retry_octocrab_call(&policy, "test list", || async {
octo.get("/test", None::<&()>).await
})
.await;
assert!(result.is_err(), "attempts=1 + 503 must surface Err");
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"attempts=1 must produce exactly one octocrab call"
);
}
#[test]
fn format_retry_warn_shape_pins_shared_format() {
let s = format_retry_warn("delete release", 3, 10, 503);
assert_eq!(
s,
"release: delete release failed (retriable, attempt 3/10, status=503)"
);
}
}