use std::time::{Duration, Instant};
use url::Url;
use crate::error::Error;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Endpoint {
pub(crate) url: Url,
}
impl Endpoint {
pub fn parse(raw: &str) -> Result<Self, Error> {
let url = Url::parse(raw)
.map_err(|e| Error::Validation(format!("invalid seed endpoint `{raw}`: {e}")))?;
if url.scheme() != "http" && url.scheme() != "https" {
return Err(Error::Validation(format!(
"seed endpoint `{raw}` must be http or https, got `{}`",
url.scheme()
)));
}
if url.host_str().is_none() {
return Err(Error::Validation(format!(
"seed endpoint `{raw}` is missing a host"
)));
}
Ok(Self { url })
}
pub fn url(&self) -> &Url {
&self.url
}
pub(crate) fn join_api(&self, path: &str) -> Result<Url, Error> {
debug_assert!(path.starts_with('/'), "seed path must be absolute");
let full = format!("/api/v1{path}");
self.url
.join(&full)
.map_err(|e| Error::Validation(format!("bad seed path `{path}`: {e}")))
}
pub(crate) fn key(&self) -> String {
let s = self.url.as_str();
s.trim_end_matches('/').to_owned()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerState {
Healthy,
Degraded,
Unhealthy,
}
impl PeerState {
fn rank(self) -> u8 {
match self {
PeerState::Healthy => 0,
PeerState::Degraded => 1,
PeerState::Unhealthy => 2,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerErrorClass {
Network,
Timeout,
Server5xx,
ServiceUnavailable,
}
#[derive(Debug, Clone)]
pub struct Peer {
pub list_index: usize,
pub endpoint: Endpoint,
pub state: PeerState,
pub latency_ema_ms: Option<f64>,
pub last_used_at: Option<Instant>,
pub consecutive_failures: u32,
}
impl Peer {
fn new(list_index: usize, endpoint: Endpoint) -> Self {
Self {
list_index,
endpoint,
state: PeerState::Healthy,
latency_ema_ms: None,
last_used_at: None,
consecutive_failures: 0,
}
}
fn sort_key(&self) -> (u8, u64, usize) {
let ema = self
.latency_ema_ms
.map(|v| (v.max(0.0) * 1_000.0) as u64)
.unwrap_or(u64::MAX / 2);
(self.state.rank(), ema, self.list_index)
}
}
#[derive(Debug, Clone)]
pub struct PeerSet {
peers: Vec<Peer>,
}
impl PeerSet {
pub fn single(endpoint: Endpoint) -> Self {
Self {
peers: vec![Peer::new(0, endpoint)],
}
}
pub fn new(endpoints: Vec<Endpoint>) -> Result<Self, Error> {
if endpoints.is_empty() {
return Err(Error::Validation(
"PeerSet requires at least one endpoint".into(),
));
}
let peers = endpoints
.into_iter()
.enumerate()
.map(|(i, ep)| Peer::new(i, ep))
.collect();
Ok(Self { peers })
}
pub fn try_from_many(endpoints: Vec<Endpoint>) -> Result<Self, Error> {
Self::new(endpoints)
}
pub fn primary(&self) -> &Endpoint {
&self.peers[0].endpoint
}
pub fn len(&self) -> usize {
self.peers.len()
}
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}
pub fn is_mesh(&self) -> bool {
self.peers.len() > 1
}
pub fn iter(&self) -> impl Iterator<Item = &Endpoint> {
self.peers.iter().map(|p| &p.endpoint)
}
pub fn peers(&self) -> &[Peer] {
&self.peers
}
pub fn pick(&self) -> &Peer {
let mut best: Option<&Peer> = None;
for p in &self.peers {
match best {
None => best = Some(p),
Some(current) if p.sort_key() < current.sort_key() => best = Some(p),
_ => {}
}
}
best.expect("PeerSet invariant: at least one peer")
}
pub fn find_by_key(&self, wanted_key: &str) -> Option<&Peer> {
self.peers.iter().find(|p| p.endpoint.key() == wanted_key)
}
pub fn pick_local_first(&self) -> &Peer {
self.peers
.iter()
.filter(|p| p.state != PeerState::Unhealthy)
.min_by_key(|p| p.list_index)
.unwrap_or_else(|| self.pick())
}
pub fn pick_random(&self) -> &Peer {
let candidates: Vec<&Peer> = self
.peers
.iter()
.filter(|p| p.state != PeerState::Unhealthy)
.collect();
if candidates.is_empty() {
return self.pick();
}
let seed = Instant::now().elapsed().subsec_nanos() as usize;
let idx = seed % candidates.len();
candidates[idx]
}
pub fn rediscover(&mut self) {
for p in &mut self.peers {
p.state = PeerState::Healthy;
p.consecutive_failures = 0;
p.latency_ema_ms = None;
p.last_used_at = None;
}
}
pub fn next_after(&self, failed: &Peer) -> Option<&Peer> {
let mut best: Option<&Peer> = None;
for p in &self.peers {
if p.list_index == failed.list_index {
continue;
}
match best {
None => best = Some(p),
Some(current) if p.sort_key() < current.sort_key() => best = Some(p),
_ => {}
}
}
best
}
pub fn mark_success(&mut self, peer_key: &str, latency: Duration) {
if let Some(p) = self.peer_mut(peer_key) {
let ms = latency.as_secs_f64() * 1_000.0;
p.latency_ema_ms = Some(match p.latency_ema_ms {
None => ms,
Some(prev) => 0.8 * prev + 0.2 * ms,
});
p.consecutive_failures = 0;
p.state = PeerState::Healthy;
p.last_used_at = Some(Instant::now());
}
}
pub fn mark_failure(&mut self, peer_key: &str, class: PeerErrorClass) {
if let Some(p) = self.peer_mut(peer_key) {
p.consecutive_failures = p.consecutive_failures.saturating_add(1);
p.last_used_at = Some(Instant::now());
p.state = match class {
PeerErrorClass::ServiceUnavailable => PeerState::Unhealthy,
_ => {
if p.consecutive_failures >= 3 {
PeerState::Unhealthy
} else {
PeerState::Degraded
}
}
};
}
}
fn peer_mut(&mut self, peer_key: &str) -> Option<&mut Peer> {
self.peers.iter_mut().find(|p| p.endpoint.key() == peer_key)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ep(url: &str) -> Endpoint {
Endpoint::parse(url).unwrap()
}
#[test]
fn parse_rejects_ws() {
let err = Endpoint::parse("ws://x:1").unwrap_err();
assert!(matches!(err, Error::Validation(_)));
}
#[test]
fn parse_rejects_garbage() {
let err = Endpoint::parse("not a url").unwrap_err();
assert!(matches!(err, Error::Validation(_)));
}
#[test]
fn parse_accepts_https() {
let ep = Endpoint::parse("https://cognitum.local:8443").unwrap();
assert_eq!(ep.url().scheme(), "https");
assert_eq!(ep.url().host_str(), Some("cognitum.local"));
assert_eq!(ep.url().port(), Some(8443));
}
#[test]
fn join_api_builds_full_url() {
let ep = Endpoint::parse("https://cognitum.local:8443").unwrap();
let url = ep.join_api("/status").unwrap();
assert_eq!(url.as_str(), "https://cognitum.local:8443/api/v1/status");
}
#[test]
fn single_peerset_is_len_one() {
let ps = PeerSet::single(ep("https://seed:8443"));
assert_eq!(ps.len(), 1);
assert!(!ps.is_mesh());
}
#[test]
fn new_rejects_empty() {
let err = PeerSet::new(vec![]).unwrap_err();
assert!(matches!(err, Error::Validation(_)));
}
#[test]
fn pick_prefers_lower_list_index_when_no_latency() {
let ps = PeerSet::new(vec![
ep("https://a:8443"),
ep("https://b:8443"),
ep("https://c:8443"),
])
.unwrap();
let picked = ps.pick();
assert_eq!(picked.list_index, 0);
}
#[test]
fn pick_prefers_lower_latency_ema() {
let mut ps = PeerSet::new(vec![ep("https://a:8443"), ep("https://b:8443")]).unwrap();
ps.mark_success("https://a:8443", Duration::from_millis(100));
ps.mark_success("https://b:8443", Duration::from_millis(10));
let picked = ps.pick();
assert_eq!(picked.list_index, 1);
}
#[test]
fn next_after_cycles() {
let ps = PeerSet::new(vec![ep("https://a:8443"), ep("https://b:8443")]).unwrap();
let first = ps.pick();
let next = ps.next_after(first).expect("second peer");
assert_ne!(first.list_index, next.list_index);
}
#[test]
fn next_after_single_peer_returns_none() {
let ps = PeerSet::new(vec![ep("https://a:8443")]).unwrap();
let peer = ps.pick();
assert!(ps.next_after(peer).is_none());
}
#[test]
fn mark_failure_degrades_then_unhealthy() {
let mut ps = PeerSet::new(vec![ep("https://a:8443"), ep("https://b:8443")]).unwrap();
let key = "https://a:8443";
ps.mark_failure(key, PeerErrorClass::Network);
assert_eq!(ps.find_by_key(key).unwrap().state, PeerState::Degraded);
ps.mark_failure(key, PeerErrorClass::Network);
ps.mark_failure(key, PeerErrorClass::Network);
assert_eq!(ps.find_by_key(key).unwrap().state, PeerState::Unhealthy);
}
#[test]
fn mark_failure_503_is_immediate_unhealthy() {
let mut ps = PeerSet::new(vec![ep("https://a:8443"), ep("https://b:8443")]).unwrap();
ps.mark_failure("https://a:8443", PeerErrorClass::ServiceUnavailable);
assert_eq!(
ps.find_by_key("https://a:8443").unwrap().state,
PeerState::Unhealthy
);
}
#[test]
fn unhealthy_peers_skipped_but_still_pickable_when_all_unhealthy() {
let mut ps = PeerSet::new(vec![ep("https://a:8443"), ep("https://b:8443")]).unwrap();
ps.mark_failure("https://a:8443", PeerErrorClass::ServiceUnavailable);
assert_eq!(ps.pick().list_index, 1);
ps.mark_failure("https://b:8443", PeerErrorClass::ServiceUnavailable);
let picked = ps.pick();
assert!(picked.list_index == 0 || picked.list_index == 1);
}
#[test]
fn mark_success_clears_unhealthy() {
let mut ps = PeerSet::new(vec![ep("https://a:8443")]).unwrap();
ps.mark_failure("https://a:8443", PeerErrorClass::Network);
ps.mark_failure("https://a:8443", PeerErrorClass::Network);
ps.mark_failure("https://a:8443", PeerErrorClass::Network);
assert_eq!(
ps.find_by_key("https://a:8443").unwrap().state,
PeerState::Unhealthy
);
ps.mark_success("https://a:8443", Duration::from_millis(5));
assert_eq!(
ps.find_by_key("https://a:8443").unwrap().state,
PeerState::Healthy
);
}
#[test]
fn endpoint_key_strips_trailing_slash() {
let a = Endpoint::parse("https://a:8443/").unwrap();
let b = Endpoint::parse("https://a:8443").unwrap();
assert_eq!(a.key(), b.key());
}
}