use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use url::Url;
use crate::error::LlmError;
#[derive(Debug, Clone)]
pub struct GonkaEndpoint {
pub base_url: String,
pub address: String,
}
pub struct EndpointPool {
nodes: Vec<GonkaEndpoint>,
cursor: AtomicUsize,
failed_until: Vec<AtomicU64>,
}
impl std::fmt::Debug for EndpointPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EndpointPool")
.field("nodes", &self.nodes)
.field("cursor", &self.cursor.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl EndpointPool {
pub fn new(nodes: Vec<GonkaEndpoint>) -> Result<Self, LlmError> {
if nodes.is_empty() {
return Err(LlmError::Other(
"EndpointPool requires at least one node".into(),
));
}
for node in &nodes {
let parsed = Url::parse(&node.base_url).map_err(|e| {
LlmError::Other(format!("invalid endpoint URL '{}': {e}", node.base_url))
})?;
if !matches!(parsed.scheme(), "http" | "https") {
return Err(LlmError::Other(format!(
"endpoint URL '{}' must use http or https scheme",
node.base_url
)));
}
}
let n = nodes.len();
let failed_until = (0..n).map(|_| AtomicU64::new(0)).collect();
Ok(Self {
nodes,
cursor: AtomicUsize::new(0),
failed_until,
})
}
pub fn next(&self) -> &GonkaEndpoint {
let _span = tracing::trace_span!("llm.gonka.endpoint_next").entered();
let n = self.nodes.len();
let now_ns = now_ns();
for _ in 0..n {
let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % n;
if self.failed_until[idx].load(Ordering::Relaxed) <= now_ns {
return &self.nodes[idx];
}
}
let best = self
.failed_until
.iter()
.enumerate()
.min_by_key(|(_, a)| a.load(Ordering::Relaxed))
.map_or(0, |(i, _)| i);
&self.nodes[best]
}
pub fn mark_failed(&self, idx: usize, cooldown: Duration) {
if idx >= self.nodes.len() {
return;
}
let cooldown_ns = u64::try_from(cooldown.as_nanos()).unwrap_or(u64::MAX);
let deadline = now_ns().saturating_add(cooldown_ns);
self.failed_until[idx].store(deadline, Ordering::Relaxed);
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
pub fn next_indexed(&self) -> (usize, &GonkaEndpoint) {
let _span = tracing::trace_span!("llm.gonka.endpoint_next").entered();
let n = self.nodes.len();
let now_ns = now_ns();
for _ in 0..n {
let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % n;
if self.failed_until[idx].load(Ordering::Relaxed) <= now_ns {
return (idx, &self.nodes[idx]);
}
}
let best = self
.failed_until
.iter()
.enumerate()
.min_by_key(|(_, a)| a.load(Ordering::Relaxed))
.map_or(0, |(i, _)| i);
(best, &self.nodes[best])
}
}
#[inline]
pub(crate) fn now_ns() -> u64 {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
u64::try_from(nanos).unwrap_or(u64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
use std::time::Duration;
fn make_pool(n: usize) -> EndpointPool {
let nodes = (0..n)
.map(|i| GonkaEndpoint {
base_url: format!("https://node{i}.example"),
address: format!("addr{i}"),
})
.collect();
EndpointPool::new(nodes).expect("non-empty pool")
}
fn url_to_idx(url: &str, n: usize) -> usize {
for i in 0..n {
if url == format!("https://node{i}.example") {
return i;
}
}
panic!("unrecognised url: {url}");
}
#[test]
fn gonka_endpoint_round_robin_three_nodes() {
let pool = make_pool(3);
let calls: Vec<usize> = (0..6)
.map(|_| url_to_idx(pool.next().base_url.as_str(), 3))
.collect();
let mut first = calls[..3].to_vec();
first.sort_unstable();
assert_eq!(first, vec![0, 1, 2]);
let mut second = calls[3..].to_vec();
second.sort_unstable();
assert_eq!(second, vec![0, 1, 2]);
}
#[test]
fn gonka_endpoint_failed_node_skipped_during_cooldown() {
let pool = make_pool(3);
pool.mark_failed(0, Duration::from_hours(1));
for _ in 0..9 {
let idx = url_to_idx(pool.next().base_url.as_str(), 3);
assert_ne!(idx, 0, "failed node 0 must not be returned");
}
}
#[test]
fn gonka_endpoint_failed_node_restored_after_cooldown() {
let pool = make_pool(2);
pool.mark_failed(0, Duration::ZERO);
let seen: Vec<usize> = (0..6)
.map(|_| url_to_idx(pool.next().base_url.as_str(), 2))
.collect();
assert!(
seen.contains(&0),
"recovered node 0 must be selectable; got: {seen:?}"
);
}
#[test]
fn gonka_endpoint_all_failed_fallback_no_panic() {
let pool = make_pool(3);
for i in 0..3 {
pool.mark_failed(i, Duration::from_hours(1));
}
for _ in 0..6 {
let idx = url_to_idx(pool.next().base_url.as_str(), 3);
assert!(idx < 3, "index out of range: {idx}");
}
}
#[test]
fn gonka_endpoint_invalid_scheme_returns_err() {
let result = EndpointPool::new(vec![GonkaEndpoint {
base_url: "ftp://node.example".into(),
address: "addr".into(),
}]);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("http or https scheme"),
"unexpected error message: {msg}"
);
}
#[test]
fn gonka_endpoint_invalid_url_returns_err() {
let result = EndpointPool::new(vec![GonkaEndpoint {
base_url: "not a url".into(),
address: "addr".into(),
}]);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("invalid endpoint URL"),
"unexpected error message: {msg}"
);
}
#[test]
fn gonka_endpoint_empty_constructor_returns_err() {
let result = EndpointPool::new(vec![]);
match result {
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("EndpointPool requires at least one node"),
"unexpected error message: {msg}"
);
}
Ok(_) => panic!("expected Err for empty pool, got Ok"),
}
}
#[test]
fn gonka_endpoint_len_and_is_empty() {
let pool = make_pool(4);
assert_eq!(pool.len(), 4);
assert!(!pool.is_empty());
}
#[test]
fn gonka_endpoint_mark_failed_out_of_range_noop() {
let pool = make_pool(2);
pool.mark_failed(99, Duration::from_secs(10)); }
#[test]
fn gonka_endpoint_clear_failure_via_atomic_store() {
let pool = make_pool(2);
pool.mark_failed(0, Duration::from_hours(1));
pool.failed_until[0].store(0, Ordering::Relaxed);
let seen: Vec<usize> = (0..6)
.map(|_| url_to_idx(pool.next().base_url.as_str(), 2))
.collect();
assert!(
seen.contains(&0),
"node 0 must be selectable after atomic clear"
);
}
}