use crate::ring::{Distance, Location, PeerKeyLocation};
use pav_regression::IsotonicRegression;
use pav_regression::Point;
use serde::Serialize;
use std::collections::{HashMap, VecDeque};
const MIN_POINTS_FOR_REGRESSION: usize = 5;
const MAX_REGRESSION_POINTS: usize = 500;
const EWMA_ALPHA: f64 = 0.1;
#[derive(Debug, Clone, Serialize)]
pub(crate) struct IsotonicEstimator {
pub global_regression: IsotonicRegression<f64>,
pub peer_adjustments: HashMap<PeerKeyLocation, Adjustment>,
#[serde(skip)]
raw_points: VecDeque<Point<f64>>,
}
impl IsotonicEstimator {
const ADJUSTMENT_PRIOR_SIZE: u64 = 10;
pub fn new<I>(history: I, estimator_type: EstimatorType) -> Self
where
I: IntoIterator<Item = IsotonicEvent>,
{
let mut all_events: Vec<IsotonicEvent> = history.into_iter().collect();
if all_events.len() > MAX_REGRESSION_POINTS {
all_events.drain(..all_events.len() - MAX_REGRESSION_POINTS);
}
let mut all_points = VecDeque::with_capacity(all_events.len());
let mut peer_events: HashMap<PeerKeyLocation, Vec<IsotonicEvent>> = HashMap::new();
for event in all_events {
let point = Point::new(event.route_distance().as_f64(), event.result);
all_points.push_back(point);
peer_events
.entry(event.peer.clone())
.or_default()
.push(event);
}
let points: Vec<Point<f64>> = all_points.iter().cloned().collect();
let global_regression = match estimator_type {
EstimatorType::Positive => IsotonicRegression::new_ascending(&points),
EstimatorType::Negative => IsotonicRegression::new_descending(&points),
}
.expect("Failed to create isotonic regression");
let global_regression_big_enough =
global_regression.len() >= Self::ADJUSTMENT_PRIOR_SIZE as usize;
let mut peer_adjustments: HashMap<PeerKeyLocation, Adjustment> = HashMap::new();
if global_regression_big_enough {
for (peer_location, events) in peer_events.iter() {
let mut adjustment = Adjustment::new();
adjustment.effective_count = Self::ADJUSTMENT_PRIOR_SIZE as f64;
for event in events {
let global_estimate = global_regression
.interpolate(event.route_distance().as_f64())
.expect("Regression should always produce an estimate");
let delta = event.result - global_estimate;
adjustment.add(delta);
}
peer_adjustments.insert(peer_location.clone(), adjustment);
}
}
IsotonicEstimator {
global_regression,
peer_adjustments,
raw_points: all_points,
}
}
pub fn add_event(&mut self, event: IsotonicEvent) {
let route_distance = event.route_distance();
let point = Point::new(route_distance.as_f64(), event.result);
self.global_regression.add_points(&[point]);
self.raw_points.push_back(point);
if self.raw_points.len() > MAX_REGRESSION_POINTS {
if let Some(oldest) = self.raw_points.pop_front() {
self.global_regression.remove_points(&[oldest]);
}
}
if self.global_regression.len() >= Self::ADJUSTMENT_PRIOR_SIZE as usize {
let adjustment = event.result
- self
.global_regression
.interpolate(route_distance.as_f64())
.unwrap();
self.peer_adjustments
.entry(event.peer)
.or_default()
.add(adjustment);
}
}
pub fn estimate_retrieval_time(
&self,
peer: &PeerKeyLocation,
contract_location: Location,
) -> Result<f64, EstimationError> {
if self.global_regression.len() < MIN_POINTS_FOR_REGRESSION {
return Err(EstimationError::InsufficientData);
}
let peer_location = peer.location().ok_or(EstimationError::InsufficientData)?;
let distance: f64 = contract_location.distance(peer_location).as_f64();
let global_estimate = self
.global_regression
.interpolate(distance)
.ok_or(EstimationError::InsufficientData)?;
let global_estimate = global_estimate.max(0.0);
Ok(self
.peer_adjustments
.get(peer)
.map_or(global_estimate, |peer_adjustment| {
let should_use_peer_adjustment =
peer_adjustment.effective_count >= MIN_POINTS_FOR_REGRESSION as f64;
global_estimate
+ if should_use_peer_adjustment {
peer_adjustment.value()
} else {
0.0
}
}))
}
pub(crate) fn len(&self) -> usize {
self.global_regression.len()
}
pub(crate) fn curve_points(&self) -> Vec<(f64, f64)> {
self.global_regression
.get_points_sorted()
.into_iter()
.map(|p| (*p.x(), *p.y()))
.collect()
}
}
#[derive(Debug, Clone)]
pub(crate) enum EstimatorType {
Positive,
Negative,
}
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub(crate) enum EstimationError {
#[error("Insufficient data for estimation")]
InsufficientData,
}
#[derive(Debug, Clone)]
pub(crate) struct IsotonicEvent {
pub peer: PeerKeyLocation,
pub contract_location: Location,
pub result: f64,
}
impl IsotonicEvent {
fn route_distance(&self) -> Distance {
let peer_location = self
.peer
.location()
.ok_or(EstimationError::InsufficientData)
.expect("IsotonicEvent should always carry a peer location");
self.contract_location.distance(peer_location)
}
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct Adjustment {
smoothed: f64,
effective_count: f64,
#[serde(skip)]
alpha: f64,
}
impl Default for Adjustment {
fn default() -> Self {
Self::new()
}
}
impl Adjustment {
fn new() -> Self {
Self {
smoothed: 0.0,
effective_count: 0.0,
alpha: EWMA_ALPHA,
}
}
fn add(&mut self, value: f64) {
if self.effective_count < 1.0 {
self.smoothed = value;
} else {
self.smoothed = self.alpha * value + (1.0 - self.alpha) * self.smoothed;
}
self.effective_count = 1.0 + (1.0 - self.alpha) * self.effective_count;
}
pub(crate) fn value(&self) -> f64 {
self.smoothed
}
pub(crate) fn event_count(&self) -> u64 {
self.effective_count.round() as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing::debug;
#[test]
fn test_positive_peer_time_estimator() {
let mut events = Vec::new();
for _ in 0..100 {
let peer = PeerKeyLocation::random();
if peer.location().is_none() {
debug!("Peer location is none for {peer:?}");
}
let contract_location = Location::random();
events.push(simulate_positive_request(peer, contract_location));
}
let (training_events, testing_events) = events.split_at(events.len() / 2);
let estimator =
IsotonicEstimator::new(training_events.iter().cloned(), EstimatorType::Positive);
let mut errors = Vec::new();
for event in testing_events {
let estimated_time = estimator
.estimate_retrieval_time(&event.peer, event.contract_location)
.unwrap();
let actual_time = event.result;
let error = (estimated_time - actual_time).abs();
errors.push(error);
}
let average_error = errors.iter().sum::<f64>() / errors.len() as f64;
debug!("Average error: {average_error}");
assert!(average_error < 0.02);
}
#[test]
fn test_negative_peer_time_estimator() {
let mut events = Vec::new();
for _ in 0..100 {
let peer = PeerKeyLocation::random();
if peer.location().is_none() {
debug!("Peer location is none for {peer:?}");
}
let contract_location = Location::random();
events.push(simulate_negative_request(peer, contract_location));
}
let (training_events, testing_events) = events.split_at(events.len() / 2);
let estimator =
IsotonicEstimator::new(training_events.iter().cloned(), EstimatorType::Negative);
let mut errors = Vec::new();
for event in testing_events {
let estimated_time = estimator
.estimate_retrieval_time(&event.peer, event.contract_location)
.unwrap();
let actual_time = event.result;
let error = (estimated_time - actual_time).abs();
errors.push(error);
}
let average_error = errors.iter().sum::<f64>() / errors.len() as f64;
debug!("Average error: {average_error}");
assert!(average_error < 0.02);
}
#[test]
fn test_adjustment_ewma_recency() {
let mut adj = Adjustment::new();
for _ in 0..100 {
adj.add(10.0);
}
let after_bad = adj.value();
assert!(
(after_bad - 10.0).abs() < 0.01,
"EWMA should converge to 10.0, got {after_bad}"
);
for _ in 0..20 {
adj.add(0.0);
}
let after_recovery = adj.value();
assert!(
after_recovery < 2.0,
"EWMA should reflect recent 0.0 events after 20 observations, got {after_recovery}"
);
}
#[test]
fn test_adjustment_ewma_first_observation() {
let mut adj = Adjustment {
alpha: 0.5,
..Adjustment::new()
};
adj.add(5.0);
assert_eq!(adj.value(), 5.0, "First observation should be set directly");
assert_eq!(adj.event_count(), 1);
adj.add(3.0);
assert!(
(adj.value() - 4.0).abs() < 1e-10,
"Second observation should blend via EWMA"
);
}
#[test]
fn test_rolling_window_eviction() {
let peer = PeerKeyLocation::random();
let contract = Location::random();
let mut estimator = IsotonicEstimator::new(std::iter::empty(), EstimatorType::Positive);
for i in 0..(MAX_REGRESSION_POINTS + 100) {
estimator.add_event(IsotonicEvent {
peer: peer.clone(),
contract_location: contract,
result: i as f64,
});
}
assert!(
estimator.raw_points.len() <= MAX_REGRESSION_POINTS,
"Raw points should be bounded, got {}",
estimator.raw_points.len()
);
let result = estimator.estimate_retrieval_time(&peer, contract);
assert!(
result.is_ok(),
"Estimator should produce estimates after eviction"
);
}
#[test]
fn test_estimator_adapts_to_regime_change() {
let peer = PeerKeyLocation::random();
let contract = Location::new(0.0);
let mut estimator = IsotonicEstimator::new(std::iter::empty(), EstimatorType::Positive);
let peers: Vec<PeerKeyLocation> = (0..5).map(|_| PeerKeyLocation::random()).collect();
for _ in 0..30 {
for p in &peers {
estimator.add_event(IsotonicEvent {
peer: p.clone(),
contract_location: contract,
result: 100.0,
});
}
}
for _ in 0..20 {
estimator.add_event(IsotonicEvent {
peer: peer.clone(),
contract_location: contract,
result: 200.0,
});
}
let estimate_before = estimator
.estimate_retrieval_time(&peer, contract)
.unwrap_or(0.0);
for _ in 0..20 {
estimator.add_event(IsotonicEvent {
peer: peer.clone(),
contract_location: contract,
result: 50.0,
});
}
let estimate_after = estimator
.estimate_retrieval_time(&peer, contract)
.unwrap_or(0.0);
assert!(
estimate_after < estimate_before,
"Estimate should decrease after peer improves: before={estimate_before}, after={estimate_after}"
);
}
fn peer_noise(peer: &PeerKeyLocation) -> f64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
format!("{}", peer.pub_key()).hash(&mut hasher);
(hasher.finish() as u8) as f64
}
fn simulate_request(
peer: PeerKeyLocation,
contract_location: Location,
result_fn: impl FnOnce(f64) -> f64,
) -> IsotonicEvent {
let distance = peer
.location()
.unwrap()
.distance(contract_location)
.as_f64();
let result = result_fn(distance) + peer_noise(&peer);
IsotonicEvent {
peer,
contract_location,
result,
}
}
fn simulate_positive_request(
peer: PeerKeyLocation,
contract_location: Location,
) -> IsotonicEvent {
simulate_request(peer, contract_location, |d| d.powf(0.5))
}
fn simulate_negative_request(
peer: PeerKeyLocation,
contract_location: Location,
) -> IsotonicEvent {
simulate_request(peer, contract_location, |d| (100.0 - d).powf(0.5))
}
}