use futures::future::join_all;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use crate::error::ShardError;
use crate::types::{EdgeDirectoryResponse, EdgeHealth, EdgeInfo};
pub(crate) const DEFAULT_EDGES: &[&str] = &[
"https://use1.api.shardd.xyz",
"https://euc1.api.shardd.xyz",
"https://ape1.api.shardd.xyz",
];
const MAX_ACCEPTABLE_SYNC_GAP: u64 = 100;
const COOLDOWN_MS: u64 = 60_000;
const PROBE_TIMEOUT_MS: u64 = 2_000;
#[derive(Debug, Clone)]
pub(crate) struct Candidate {
pub base_url: String,
pub rtt_ms: Option<u64>,
pub cooldown_until: Option<Instant>,
}
impl Candidate {
fn new(base_url: String) -> Self {
Self {
base_url,
rtt_ms: None,
cooldown_until: None,
}
}
fn is_cool(&self, now: Instant) -> bool {
self.cooldown_until.map(|t| t > now).unwrap_or(false)
}
}
pub(crate) struct EdgeSelector {
inner: Mutex<Inner>,
}
struct Inner {
candidates: Vec<Candidate>,
initialized: bool,
}
impl EdgeSelector {
pub(crate) fn new(bootstrap: Vec<String>) -> Self {
let candidates = bootstrap.into_iter().map(Candidate::new).collect();
Self {
inner: Mutex::new(Inner {
candidates,
initialized: false,
}),
}
}
pub(crate) fn live_urls(&self) -> Vec<String> {
let now = Instant::now();
let guard = self.inner.lock().unwrap();
guard
.candidates
.iter()
.filter(|c| !c.is_cool(now))
.map(|c| c.base_url.clone())
.collect()
}
pub(crate) fn needs_probe(&self) -> bool {
let now = Instant::now();
let guard = self.inner.lock().unwrap();
if !guard.initialized {
return true;
}
!guard.candidates.iter().any(|c| !c.is_cool(now))
}
pub(crate) fn mark_failure(&self, base_url: &str) {
let mut guard = self.inner.lock().unwrap();
let until = Instant::now() + Duration::from_millis(COOLDOWN_MS);
for c in &mut guard.candidates {
if c.base_url == base_url {
c.cooldown_until = Some(until);
}
}
}
pub(crate) fn mark_success(&self, base_url: &str) {
let mut guard = self.inner.lock().unwrap();
for c in &mut guard.candidates {
if c.base_url == base_url {
c.cooldown_until = None;
}
}
}
#[allow(dead_code)]
pub(crate) fn replace_directory(&self, fresh: Vec<EdgeInfo>) {
let mut guard = self.inner.lock().unwrap();
let existing: std::collections::HashMap<String, Candidate> = guard
.candidates
.drain(..)
.map(|c| (c.base_url.clone(), c))
.collect();
for edge in fresh {
let base_url = edge.base_url;
let c = existing
.get(&base_url)
.cloned()
.unwrap_or_else(|| Candidate::new(base_url.clone()));
guard.candidates.push(c);
}
if guard.candidates.is_empty() {
guard.candidates = existing.into_values().collect();
}
}
pub(crate) async fn probe_all(&self, http: &reqwest::Client) -> Result<(), ShardError> {
let urls: Vec<String> = {
let guard = self.inner.lock().unwrap();
guard
.candidates
.iter()
.map(|c| c.base_url.clone())
.collect()
};
if urls.is_empty() {
return Err(ShardError::ServiceUnavailable("no edges configured".into()));
}
let probes = urls.iter().map(|url| probe_one(http, url.clone()));
let results = join_all(probes).await;
let now = Instant::now();
let mut guard = self.inner.lock().unwrap();
guard.initialized = true;
for (i, result) in results.into_iter().enumerate() {
let c = &mut guard.candidates[i];
match result {
Ok((rtt_ms, health)) if is_selectable(&health) => {
c.rtt_ms = Some(rtt_ms);
c.cooldown_until = None;
}
_ => {
c.rtt_ms = None;
}
}
}
guard
.candidates
.sort_by(|a, b| match (a.is_cool(now), b.is_cool(now)) {
(false, true) => std::cmp::Ordering::Less,
(true, false) => std::cmp::Ordering::Greater,
_ => a
.rtt_ms
.unwrap_or(u64::MAX)
.cmp(&b.rtt_ms.unwrap_or(u64::MAX)),
});
Ok(())
}
}
async fn probe_one(http: &reqwest::Client, base_url: String) -> Result<(u64, EdgeHealth), ()> {
let url = format!("{}/gateway/health", base_url.trim_end_matches('/'));
let start = Instant::now();
let resp = tokio::time::timeout(
Duration::from_millis(PROBE_TIMEOUT_MS),
http.get(&url).send(),
)
.await
.map_err(|_| ())?
.map_err(|_| ())?;
if !resp.status().is_success() {
return Err(());
}
let health: EdgeHealth = resp.json().await.map_err(|_| ())?;
let rtt_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
Ok((rtt_ms, health))
}
fn is_selectable(health: &EdgeHealth) -> bool {
if !health.ready {
return false;
}
if matches!(health.overloaded, Some(true)) {
return false;
}
if health
.sync_gap
.map(|g| g > MAX_ACCEPTABLE_SYNC_GAP)
.unwrap_or(false)
{
return false;
}
true
}
pub(crate) async fn fetch_directory(
http: &reqwest::Client,
base_url: &str,
) -> Result<EdgeDirectoryResponse, ShardError> {
let url = format!("{}/gateway/edges", base_url.trim_end_matches('/'));
let resp = http
.get(&url)
.send()
.await
.map_err(|e| ShardError::Network(e.to_string()))?;
if !resp.status().is_success() {
return Err(ShardError::ServiceUnavailable(format!(
"edges fetch returned HTTP {}",
resp.status().as_u16()
)));
}
resp.json()
.await
.map_err(|e| ShardError::Decode(e.to_string()))
}