use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use crate::release_log;
use anodizer_core::EnvSource;
#[cfg(all(test, unix))]
use anodizer_core::ProcessEnvSource;
pub(crate) type SleepFn = Box<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
pub(crate) fn tokio_sleep() -> SleepFn {
Box::new(|d| Box::pin(tokio::time::sleep(d)))
}
pub(crate) fn compute_rate_limit_sleep_secs(
remaining: u64,
threshold: u64,
reset_epoch: u64,
now: u64,
) -> Option<u64> {
if remaining > threshold {
return None;
}
let secs = if reset_epoch > now {
reset_epoch - now + 1
} else {
5
};
Some(secs)
}
fn github_api_base_from<E: EnvSource + ?Sized>(env: &E) -> String {
let raw = env
.var("ANODIZER_GITHUB_API_BASE")
.unwrap_or_else(|| "https://api.github.com".to_string());
raw.trim_end_matches('/').to_string()
}
#[cfg(all(test, unix))]
pub(crate) async fn check_github_rate_limit(client: &reqwest::Client, token: &str, threshold: u64) {
check_github_rate_limit_with_env(client, token, threshold, &ProcessEnvSource).await;
}
pub(crate) async fn check_github_rate_limit_with_env<E: EnvSource + ?Sized>(
client: &reqwest::Client,
token: &str,
threshold: u64,
env: &E,
) {
check_github_rate_limit_with_sleep(client, token, threshold, env, tokio_sleep()).await;
}
pub(crate) async fn check_github_rate_limit_with_sleep<E: EnvSource + ?Sized>(
client: &reqwest::Client,
token: &str,
threshold: u64,
env: &E,
sleep_fn: SleepFn,
) {
let url = format!("{}/rate_limit", github_api_base_from(env));
let resp = match client
.get(&url)
.header("Authorization", format!("Bearer {}", token))
.header("Accept", "application/vnd.github+json")
.header("User-Agent", anodizer_core::http::USER_AGENT)
.send()
.await
{
Ok(r) => r,
Err(_) => return, };
if !resp.status().is_success() {
return;
}
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(_) => return,
};
let remaining = body
.pointer("/resources/core/remaining")
.and_then(|v| v.as_u64())
.unwrap_or(u64::MAX);
let reset_epoch = body
.pointer("/resources/core/reset")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let sleep_secs = match compute_rate_limit_sleep_secs(remaining, threshold, reset_epoch, now) {
None => return,
Some(s) => s,
};
release_log().status(&format!(
"rate limit almost reached ({remaining} remaining), sleeping for {sleep_secs}s..."
));
let duration = Duration::from_secs(sleep_secs);
let sleep = sleep_fn(duration);
tokio::pin!(sleep);
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).ok();
tokio::select! {
_ = &mut sleep => {}
_ = tokio::signal::ctrl_c() => {
release_log().warn(
"rate-limit wait interrupted by SIGINT; release will likely fail \
on the next API call",
);
}
_ = async {
match sigterm.as_mut() {
Some(s) => { s.recv().await; }
None => std::future::pending::<()>().await,
}
} => {
release_log().warn(
"rate-limit wait interrupted by SIGTERM; release will likely fail \
on the next API call",
);
}
}
}
#[cfg(not(unix))]
{
tokio::select! {
_ = &mut sleep => {}
_ = tokio::signal::ctrl_c() => {
release_log().warn(
"rate-limit wait interrupted by Ctrl-C; release will likely fail \
on the next API call",
);
}
}
}
}
#[cfg(all(test, unix))]
mod sigterm_tests {
use super::*;
use tokio::signal::unix::{SignalKind, signal};
#[tokio::test(flavor = "current_thread")]
async fn sigterm_listener_observes_self_signal() {
let mut sigterm = signal(SignalKind::terminate())
.map_err(|e| format!("could not install SIGTERM handler: {e}"))
.ok()
.unwrap_or_else(|| panic!("SIGTERM handler install failed"));
let pid = std::process::id();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = std::process::Command::new("kill")
.args(["-TERM", &pid.to_string()])
.status();
});
let recv = sigterm.recv();
let timeout = tokio::time::sleep(std::time::Duration::from_secs(30));
tokio::select! {
v = recv => {
assert!(v.is_some(), "signal stream closed before SIGTERM delivered");
}
_ = timeout => {
panic!("SIGTERM listener did not observe self-delivered signal within 30s");
}
}
}
#[test]
fn sigterm_signal_kind_is_constructible() {
let _ = SignalKind::terminate();
}
#[tokio::test]
async fn transport_failure_silently_degrades() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
drop(listener);
let client = reqwest::Client::builder()
.resolve("api.github.com", addr)
.build()
.expect("reqwest client builds");
let fut = check_github_rate_limit(&client, "fake-token", 100);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(
res.is_ok(),
"check_github_rate_limit must silently degrade on transport failure, not hang"
);
}
}
#[cfg(test)]
mod test_helpers {
use anodizer_core::MapEnvSource;
pub(super) fn env_with_base(base: &str) -> MapEnvSource {
MapEnvSource::new().with("ANODIZER_GITHUB_API_BASE", base)
}
pub(super) fn canned_json_200(body: &str) -> &'static str {
let raw = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body,
);
Box::leak(raw.into_boxed_str())
}
}
#[cfg(test)]
mod http_tests {
use super::test_helpers::{canned_json_200, env_with_base};
use super::*;
use anodizer_core::MapEnvSource;
use anodizer_core::test_helpers::https_responder::{
https_test_client, spawn_oneshot_https_responder,
};
use std::sync::atomic::Ordering;
#[tokio::test]
async fn remaining_above_threshold_returns_without_sleep() {
let body = r#"{"resources":{"core":{"remaining":5000,"reset":9999999999,"limit":5000}}}"#;
let (addr, calls) = spawn_oneshot_https_responder(vec![canned_json_200(body)]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(
res.is_ok(),
"must return promptly when remaining > threshold"
);
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"exactly one /rate_limit request should be issued"
);
}
#[tokio::test]
async fn non_success_status_silently_degrades() {
let resp = "HTTP/1.1 401 Unauthorized\r\nContent-Length: 0\r\n\r\n";
let (addr, calls) = spawn_oneshot_https_responder(vec![resp]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(res.is_ok(), "401 must silently degrade, not hang or panic");
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn server_error_status_silently_degrades() {
let resp = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n\r\n";
let (addr, calls) = spawn_oneshot_https_responder(vec![resp]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(res.is_ok(), "500 must silently degrade");
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn malformed_json_body_silently_degrades() {
let body = "{";
let raw = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body,
);
let resp: &'static str = Box::leak(raw.into_boxed_str());
let (addr, calls) = spawn_oneshot_https_responder(vec![resp]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(
res.is_ok(),
"malformed JSON must silently degrade, not panic"
);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn missing_pointer_fields_skip_sleep() {
let body = r#"{"resources":{"other":{"remaining":1}}}"#;
let (addr, calls) = spawn_oneshot_https_responder(vec![canned_json_200(body)]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(5), fut).await;
assert!(
res.is_ok(),
"missing JSON pointer must fall back to u64::MAX (no sleep)"
);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[test]
fn base_url_strips_trailing_slash() {
let env = env_with_base("https://example.com/api/");
assert_eq!(github_api_base_from(&env), "https://example.com/api");
}
#[test]
fn base_url_defaults_to_api_github_com() {
let env = MapEnvSource::new();
assert_eq!(github_api_base_from(&env), "https://api.github.com");
}
#[tokio::test]
async fn e2e_sleeps_briefly_when_reset_is_one_second_away() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock is post-epoch")
.as_secs();
let reset = now + 1;
let body = format!(
r#"{{"resources":{{"core":{{"remaining":0,"reset":{reset},"limit":5000}}}}}}"#,
);
let (addr, calls) =
spawn_oneshot_https_responder(vec![canned_json_200(Box::leak(body.into_boxed_str()))]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let fut = check_github_rate_limit_with_env(&client, "fake-token", 100, &env);
let res = tokio::time::timeout(std::time::Duration::from_secs(8), fut).await;
assert!(
res.is_ok(),
"must complete the sleep + select wait within the bounded window"
);
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"exactly one /rate_limit request should be issued"
);
}
}
#[cfg(test)]
mod compute_tests {
use super::*;
#[test]
fn compute_returns_none_when_remaining_above_threshold() {
assert_eq!(compute_rate_limit_sleep_secs(101, 100, u64::MAX, 0), None,);
}
#[test]
fn compute_returns_future_reset_diff_plus_one_when_reset_in_future() {
assert_eq!(compute_rate_limit_sleep_secs(0, 100, 1000, 500), Some(501),);
}
#[test]
fn compute_returns_past_reset_floor_when_reset_in_past_or_equal_to_now() {
assert_eq!(compute_rate_limit_sleep_secs(0, 100, 500, 1000), Some(5),);
assert_eq!(compute_rate_limit_sleep_secs(0, 100, 1000, 1000), Some(5),);
}
#[test]
fn compute_returns_some_when_remaining_equals_threshold() {
assert_eq!(
compute_rate_limit_sleep_secs(100, 100, 2000, 1000),
Some(1001),
);
}
#[test]
fn compute_returns_some_when_remaining_is_zero() {
assert_eq!(
compute_rate_limit_sleep_secs(0, 100, 2000, 1000),
Some(1001),
);
}
}
#[cfg(test)]
mod sleep_injection_tests {
use super::test_helpers::{canned_json_200, env_with_base};
use super::*;
use anodizer_core::test_helpers::https_responder::{
https_test_client, spawn_oneshot_https_responder,
};
use std::sync::{Arc, Mutex};
fn recording_sleep() -> (SleepFn, Arc<Mutex<Vec<Duration>>>) {
let log: Arc<Mutex<Vec<Duration>>> = Arc::new(Mutex::new(Vec::new()));
let log_clone = Arc::clone(&log);
let f: SleepFn = Box::new(move |d| {
log_clone.lock().unwrap().push(d);
Box::pin(async {})
});
(f, log)
}
#[tokio::test]
async fn sleep_until_future_reset_records_correct_duration() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock is post-epoch")
.as_secs();
let reset = now + 600; let body = format!(
r#"{{"resources":{{"core":{{"remaining":0,"reset":{reset},"limit":5000}}}}}}"#,
);
let (addr, _calls) =
spawn_oneshot_https_responder(vec![canned_json_200(Box::leak(body.into_boxed_str()))]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let (sleep_fn, log) = recording_sleep();
check_github_rate_limit_with_sleep(&client, "fake-token", 100, &env, sleep_fn).await;
let recorded = log.lock().unwrap();
assert_eq!(
recorded.len(),
1,
"sleep callback must be invoked exactly once"
);
let secs = recorded[0].as_secs();
assert!(
(600..=602).contains(&secs),
"expected ~601s sleep, got {secs}s",
);
}
#[tokio::test]
async fn past_reset_records_five_second_floor() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock is post-epoch")
.as_secs();
let reset = now.saturating_sub(100); let body = format!(
r#"{{"resources":{{"core":{{"remaining":0,"reset":{reset},"limit":5000}}}}}}"#,
);
let (addr, _calls) =
spawn_oneshot_https_responder(vec![canned_json_200(Box::leak(body.into_boxed_str()))]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let (sleep_fn, log) = recording_sleep();
check_github_rate_limit_with_sleep(&client, "fake-token", 100, &env, sleep_fn).await;
let recorded = log.lock().unwrap();
assert_eq!(
recorded.len(),
1,
"sleep callback must be invoked exactly once"
);
assert_eq!(
recorded[0],
Duration::from_secs(5),
"past-reset branch must sleep exactly 5s",
);
}
#[tokio::test]
async fn no_sleep_when_remaining_above_threshold() {
let body = r#"{"resources":{"core":{"remaining":5000,"reset":9999999999,"limit":5000}}}"#;
let (addr, _calls) = spawn_oneshot_https_responder(vec![canned_json_200(body)]);
let env = env_with_base(&format!("https://{addr}"));
let client = https_test_client();
let (sleep_fn, log) = recording_sleep();
check_github_rate_limit_with_sleep(&client, "fake-token", 100, &env, sleep_fn).await;
let recorded = log.lock().unwrap();
assert!(
recorded.is_empty(),
"sleep callback must not be invoked when remaining > threshold",
);
}
}