use std::collections::HashMap;
use crate::ids::NodeSiteId;
pub type ChainId = u64;
pub fn chain_id_from_targets(chain_targets: &str) -> ChainId {
let mut hash: u64 = 0xcbf29ce484222325;
for b in chain_targets.as_bytes() {
hash ^= *b as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct EdgeKey {
pub chain_id: ChainId,
pub hop_index: u8,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct RttEma {
pub srtt_ns: u64,
pub rttvar_ns: u64,
pub sample_count: u64,
}
impl RttEma {
pub fn observe(&mut self, sample_ns: u64) {
self.observe_with_alpha_beta(sample_ns, 3, 2);
}
pub fn observe_with_alpha_beta(&mut self, sample_ns: u64, alpha_shift: u8, beta_shift: u8) {
if self.sample_count == 0 {
self.srtt_ns = sample_ns;
self.rttvar_ns = sample_ns >> 1;
} else {
let delta = sample_ns.abs_diff(self.srtt_ns);
let beta_div = 1u64 << beta_shift;
self.rttvar_ns =
self.rttvar_ns - (self.rttvar_ns >> beta_shift) + (delta >> beta_shift);
let alpha_div = 1u64 << alpha_shift;
self.srtt_ns =
self.srtt_ns - (self.srtt_ns >> alpha_shift) + (sample_ns >> alpha_shift);
let _ = beta_div;
let _ = alpha_div;
}
self.sample_count = self.sample_count.saturating_add(1);
}
pub fn budget_ns(&self) -> u64 {
self.srtt_ns
.saturating_add(self.rttvar_ns.saturating_mul(4))
}
pub fn is_warm(&self) -> bool {
self.sample_count >= 3
}
}
#[derive(Clone, Debug)]
pub struct PhiAccrualState {
pub inter_arrival_history: std::collections::VecDeque<u64>,
pub history_capacity: usize,
pub threshold_phi: f64,
pub down_phi: f64,
}
impl Default for PhiAccrualState {
fn default() -> Self {
Self {
inter_arrival_history: std::collections::VecDeque::new(),
history_capacity: 1000,
threshold_phi: 8.0,
down_phi: 16.0,
}
}
}
impl PhiAccrualState {
pub fn record_heartbeat(&mut self, now_ns: u64, last_seen_at_ns: u64) {
if last_seen_at_ns == 0 {
return;
}
let delta = now_ns.saturating_sub(last_seen_at_ns);
if self.inter_arrival_history.len() == self.history_capacity {
self.inter_arrival_history.pop_front();
}
self.inter_arrival_history.push_back(delta);
}
pub fn phi(&self, now_ns: u64, last_seen_at_ns: u64) -> f64 {
if self.inter_arrival_history.is_empty() || last_seen_at_ns == 0 {
return 0.0;
}
let elapsed = now_ns.saturating_sub(last_seen_at_ns) as f64;
let sum: f64 = self.inter_arrival_history.iter().map(|&x| x as f64).sum();
let mean = sum / self.inter_arrival_history.len() as f64;
if mean <= 0.0 {
return 0.0;
}
elapsed / (mean * std::f64::consts::LN_10)
}
pub fn is_suspect(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
self.phi(now_ns, last_seen_at_ns) > self.threshold_phi
}
pub fn is_down(&self, now_ns: u64, last_seen_at_ns: u64) -> bool {
self.phi(now_ns, last_seen_at_ns) > self.down_phi
}
}
#[derive(Default)]
pub struct RttTrackerEntry {
pub site_stats: RttEma,
pub per_edge_stats: HashMap<EdgeKey, RttEma>,
pub reported_outgoing: HashMap<(NodeSiteId, ChainId), RttEma>,
pub phi_accrual: PhiAccrualState,
pub last_seen_at_ns: u64,
pub last_updated_at_ns: u64,
pub last_phi_state: PhiState,
}
#[derive(Default)]
pub struct RttTracker {
entries: HashMap<NodeSiteId, RttTrackerEntry>,
chain_priors: HashMap<ChainId, RttEma>,
global_prior: RttEma,
}
#[derive(Clone, Copy, Debug)]
pub struct ChainContext {
pub chain_id: ChainId,
pub hop_index: u8,
}
impl RttTracker {
pub fn new() -> Self {
Self::default()
}
pub fn estimate_budget_ns(
&self,
site: NodeSiteId,
chain: Option<ChainContext>,
static_default_ns: u64,
) -> u64 {
if let (Some(ctx), Some(entry)) = (chain, self.entries.get(&site)) {
let key = EdgeKey {
chain_id: ctx.chain_id,
hop_index: ctx.hop_index,
};
if let Some(ema) = entry.per_edge_stats.get(&key) {
if ema.is_warm() {
return ema.budget_ns();
}
}
}
if let Some(entry) = self.entries.get(&site) {
if entry.site_stats.is_warm() {
return entry.site_stats.budget_ns();
}
}
if let Some(ctx) = chain {
if let Some(prior) = self.chain_priors.get(&ctx.chain_id) {
if prior.is_warm() {
return prior.budget_ns();
}
}
}
if self.global_prior.is_warm() {
return self.global_prior.budget_ns();
}
static_default_ns
}
pub fn observe_round_trip(
&mut self,
site: NodeSiteId,
chain: Option<ChainContext>,
elapsed_ns: u64,
now_ns: u64,
) {
let entry = self.entries.entry(site).or_default();
entry.site_stats.observe(elapsed_ns);
entry.last_updated_at_ns = now_ns;
entry
.phi_accrual
.record_heartbeat(now_ns, entry.last_seen_at_ns);
entry.last_seen_at_ns = now_ns;
if let Some(ctx) = chain {
let key = EdgeKey {
chain_id: ctx.chain_id,
hop_index: ctx.hop_index,
};
entry
.per_edge_stats
.entry(key)
.or_default()
.observe(elapsed_ns);
self.chain_priors
.entry(ctx.chain_id)
.or_default()
.observe(elapsed_ns);
}
self.global_prior.observe_with_alpha_beta(elapsed_ns, 6, 5);
}
pub fn ingest_reported_outgoing(
&mut self,
from_site: NodeSiteId,
next_hop: NodeSiteId,
chain_id: ChainId,
srtt_ns: u64,
rttvar_ns: u64,
sample_count: u64,
) {
let entry = self.entries.entry(from_site).or_default();
let report = entry
.reported_outgoing
.entry((next_hop, chain_id))
.or_default();
report.srtt_ns = srtt_ns;
report.rttvar_ns = rttvar_ns;
report.sample_count = sample_count;
}
pub fn entry(&self, site: NodeSiteId) -> Option<&RttTrackerEntry> {
self.entries.get(&site)
}
pub fn chain_prior(&self, chain_id: ChainId) -> Option<&RttEma> {
self.chain_priors.get(&chain_id)
}
pub fn global_prior(&self) -> &RttEma {
&self.global_prior
}
pub fn tracked_sites(&self) -> impl Iterator<Item = NodeSiteId> + '_ {
self.entries.keys().copied()
}
pub fn scan_phi(&mut self, now_ns: u64) -> Vec<PhiTransition> {
let mut transitions = Vec::new();
for (&site, entry) in self.entries.iter_mut() {
let phi = entry.phi_accrual.phi(now_ns, entry.last_seen_at_ns);
let new_state = if entry.phi_accrual.is_down(now_ns, entry.last_seen_at_ns) {
PhiState::Down
} else if entry.phi_accrual.is_suspect(now_ns, entry.last_seen_at_ns) {
PhiState::Suspect
} else {
PhiState::Live
};
if new_state != entry.last_phi_state {
transitions.push(match new_state {
PhiState::Live => PhiTransition::Live { site },
PhiState::Suspect => PhiTransition::Suspect { site, phi },
PhiState::Down => PhiTransition::Down { site, phi },
});
entry.last_phi_state = new_state;
}
}
transitions
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum PhiState {
#[default]
Live,
Suspect,
Down,
}
#[derive(Clone, Copy, Debug)]
pub enum PhiTransition {
Live {
site: NodeSiteId,
},
Suspect {
site: NodeSiteId,
phi: f64,
},
Down {
site: NodeSiteId,
phi: f64,
},
}