use std::sync::{Condvar, Mutex};
use fast_rands::Rand;
pub fn api_key() -> String {
if let Ok(key) = std::env::var("GEMINI_API_KEY")
&& !key.is_empty()
{
return key.trim_matches('"').to_string();
}
let env_map = agy_bridge::load_dotenv();
if let Some(key) = env_map.get("GEMINI_API_KEY")
&& !key.is_empty()
{
return key.trim_matches('"').to_string();
}
let dotenv_path = std::env::current_dir().unwrap_or_default().join(".env");
panic!(
"GEMINI_API_KEY not set in environment or in {dotenv}",
dotenv = dotenv_path.display(),
);
}
const DEFAULT_MAX_CONCURRENT: usize = 3;
const STAGGER_MAX_MS: u64 = 2000;
struct CountingSemaphore {
state: Mutex<usize>,
cvar: Condvar,
max_permits: usize,
}
impl CountingSemaphore {
const fn new(max_permits: usize) -> Self {
Self {
state: Mutex::new(0),
cvar: Condvar::new(),
max_permits,
}
}
fn acquire(&self) -> SemaphoreGuard<'_> {
let mut active = self.state.lock().unwrap_or_else(|poisoned| {
poisoned.into_inner()
});
while *active >= self.max_permits {
active = self
.cvar
.wait(active)
.unwrap_or_else(std::sync::PoisonError::into_inner);
}
*active += 1;
SemaphoreGuard { sem: self }
}
}
struct SemaphoreGuard<'a> {
sem: &'a CountingSemaphore,
}
impl Drop for SemaphoreGuard<'_> {
fn drop(&mut self) {
let mut active = self
.sem
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*active -= 1;
self.sem.cvar.notify_one();
}
}
fn max_concurrent_tests() -> usize {
std::env::var("AGY_BRIDGE_MAX_CONCURRENT_TESTS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map_or(DEFAULT_MAX_CONCURRENT, |n| n.max(1))
}
static LIVE_GATE: CountingSemaphore = CountingSemaphore::new(DEFAULT_MAX_CONCURRENT);
fn live_gate() -> &'static CountingSemaphore {
use std::sync::OnceLock;
static CUSTOM_GATE: OnceLock<Option<CountingSemaphore>> = OnceLock::new();
let custom = CUSTOM_GATE.get_or_init(|| {
let n = max_concurrent_tests();
if n == DEFAULT_MAX_CONCURRENT {
None } else {
eprintln!("[GATE] Using custom concurrency limit: {n}");
Some(CountingSemaphore::new(n))
}
});
match custom {
Some(gate) => gate,
None => &LIVE_GATE,
}
}
pub fn run_live_test<F>(test_name: &str, f: F)
where
F: Fn() -> Result<(), agy_bridge::error::Error>,
{
let _permit = live_gate().acquire();
eprintln!("[GATE] '{test_name}' acquired live-test permit");
let max_stagger = usize::try_from(STAGGER_MAX_MS).unwrap_or(usize::MAX);
let stagger =
std::time::Duration::from_millis(fast_rands::StdRand::new().between(0, max_stagger) as u64);
eprintln!(
"[STAGGER] '{test_name}' waiting {}ms before starting",
stagger.as_millis()
);
std::thread::sleep(stagger);
let start = std::time::Instant::now();
let budget = std::time::Duration::from_mins(3);
let mut sleep_duration = std::time::Duration::from_secs(5);
let mut attempt = 1;
loop {
match f() {
Ok(()) => return,
Err(ref e) if is_retryable_error(e) => {
let elapsed = start.elapsed();
assert!(
elapsed < budget,
"Test '{test_name}' failed: budget exhausted on attempt {attempt} \
with retryable error: {e}"
);
let remaining = budget.saturating_sub(elapsed);
assert!(
!remaining.is_zero(),
"Test '{test_name}' failed: budget exhausted on attempt {attempt} \
with retryable error: {e}"
);
let current_sleep = std::cmp::min(sleep_duration, remaining);
eprintln!(
"[RETRY] Test '{test_name}' failed on attempt {attempt} with retryable error: {e}\n\
Waiting {}s before retry... (Elapsed: {elapsed:?}, Remaining budget: {remaining:?})",
current_sleep.as_secs()
);
std::thread::sleep(current_sleep);
sleep_duration =
std::cmp::min(sleep_duration * 2, std::time::Duration::from_secs(15));
attempt += 1;
}
Err(e) => {
panic!("[FAILURE] Test '{test_name}' failed with non-retryable error: {e}");
}
}
}
}
fn is_retryable_error(err: &agy_bridge::error::Error) -> bool {
use agy_bridge::error::Error;
if err.is_retryable() {
return true;
}
match err {
Error::Timeout { .. } | Error::Stream(_) => true,
Error::BackendError { message } => {
let msg_lower = message.to_lowercase();
msg_lower.contains("event loop is closed") || msg_lower.contains("cancellederror")
}
_ => false,
}
}