use std::collections::HashMap;
use std::time::Duration;
use crate::adapter::net::state::horizon::ObservedHorizon;
use crate::adapter::net::subnet::SubnetId;
use super::propagation::PropagationModel;
pub const MAX_TRACKED_ENTITIES: usize = 65_536;
pub struct ObservationWindow {
horizon: ObservedHorizon,
local_subnet: SubnetId,
last_observed: HashMap<u64, u64>,
estimated_delay: HashMap<u64, u64>,
}
impl ObservationWindow {
pub fn new(local_subnet: SubnetId) -> Self {
Self {
horizon: ObservedHorizon::new(),
local_subnet,
last_observed: HashMap::new(),
estimated_delay: HashMap::new(),
}
}
pub fn from_horizon(horizon: ObservedHorizon, local_subnet: SubnetId) -> Self {
Self {
horizon,
local_subnet,
last_observed: HashMap::new(),
estimated_delay: HashMap::new(),
}
}
fn evict_if_at_cap(&mut self, origin_hash: u64) {
if self.last_observed.contains_key(&origin_hash) {
return;
}
if self.last_observed.len() < MAX_TRACKED_ENTITIES {
return;
}
if let Some((&oldest, _)) = self.last_observed.iter().min_by_key(|(_, &ts)| ts) {
self.last_observed.remove(&oldest);
self.estimated_delay.remove(&oldest);
}
}
pub fn observe_with_context(
&mut self,
origin_hash: u64,
sequence: u64,
hop_count: u8,
source_subnet: SubnetId,
model: &PropagationModel,
) {
self.horizon.observe(origin_hash, sequence);
self.evict_if_at_cap(origin_hash);
let now = current_timestamp();
self.last_observed.insert(origin_hash, now);
let delay = model
.estimate_latency(source_subnet, self.local_subnet, hop_count)
.as_nanos() as u64;
self.estimated_delay.insert(origin_hash, delay);
}
pub fn observe(&mut self, origin_hash: u64, sequence: u64) {
self.horizon.observe(origin_hash, sequence);
self.evict_if_at_cap(origin_hash);
self.last_observed.insert(origin_hash, current_timestamp());
}
pub fn staleness(&self, origin_hash: u64) -> Option<Duration> {
self.last_observed.get(&origin_hash).map(|&ts| {
let now = current_timestamp();
Duration::from_nanos(now.saturating_sub(ts))
})
}
pub fn is_within_cone(&self, origin_hash: u64, max_delay_nanos: u64) -> bool {
let delay = self.estimated_delay.get(&origin_hash).copied().unwrap_or(0);
let staleness = match self.staleness(origin_hash) {
Some(s) => s.as_nanos() as u64,
None => return false,
};
staleness <= max_delay_nanos.saturating_add(delay)
}
pub fn reachable_entities(&self, max_delay_nanos: u64) -> Vec<u64> {
self.last_observed
.keys()
.filter(|&&origin| self.is_within_cone(origin, max_delay_nanos))
.copied()
.collect()
}
pub fn divergence_from(&self, other: &ObservationWindow) -> HorizonDivergence {
let mut only_self = 0u32;
let mut only_other = 0u32;
let mut seq_diff_sum = 0u64;
let mut common = 0u32;
for (&origin, &self_seq) in self.horizon.iter() {
match other.horizon.get(origin) {
Some(other_seq) => {
common += 1;
seq_diff_sum = seq_diff_sum.saturating_add(self_seq.abs_diff(other_seq));
}
None => only_self += 1,
}
}
for &origin in other.horizon.iter().map(|(k, _)| k) {
if self.horizon.get(origin).is_none() {
only_other += 1;
}
}
HorizonDivergence {
entities_only_self: only_self,
entities_only_other: only_other,
common_entities: common,
total_seq_difference: seq_diff_sum,
}
}
pub fn horizon(&self) -> &ObservedHorizon {
&self.horizon
}
pub fn local_subnet(&self) -> SubnetId {
self.local_subnet
}
pub fn entity_count(&self) -> usize {
self.horizon.entity_count()
}
}
impl std::fmt::Debug for ObservationWindow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObservationWindow")
.field("subnet", &self.local_subnet)
.field("entities", &self.horizon.entity_count())
.field("tracked", &self.last_observed.len())
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HorizonDivergence {
pub entities_only_self: u32,
pub entities_only_other: u32,
pub common_entities: u32,
pub total_seq_difference: u64,
}
impl HorizonDivergence {
pub fn is_converged(&self) -> bool {
self.entities_only_self == 0
&& self.entities_only_other == 0
&& self.total_seq_difference == 0
}
}
use crate::adapter::net::current_timestamp;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_observe_and_staleness() {
let mut window = ObservationWindow::new(SubnetId::new(&[1]));
window.observe(0xAAAA, 10);
let staleness = window.staleness(0xAAAA);
assert!(staleness.is_some());
assert!(staleness.unwrap() < Duration::from_secs(1));
assert!(window.staleness(0xBBBB).is_none());
}
#[test]
fn test_observe_with_context() {
let model = PropagationModel::new();
let mut window = ObservationWindow::new(SubnetId::new(&[1]));
window.observe_with_context(0xAAAA, 42, 2, SubnetId::new(&[1, 2]), &model);
assert_eq!(window.entity_count(), 1);
assert!(window.horizon().has_observed(0xAAAA, 42));
}
#[test]
fn test_is_within_cone() {
let mut window = ObservationWindow::new(SubnetId::new(&[1]));
window.observe(0xAAAA, 10);
assert!(window.is_within_cone(0xAAAA, 1_000_000_000));
assert!(!window.is_within_cone(0xBBBB, 1_000_000_000));
}
#[test]
fn test_divergence_identical() {
let mut w1 = ObservationWindow::new(SubnetId::new(&[1]));
let mut w2 = ObservationWindow::new(SubnetId::new(&[1]));
w1.observe(0xAAAA, 10);
w2.observe(0xAAAA, 10);
let div = w1.divergence_from(&w2);
assert!(div.is_converged());
}
#[test]
fn test_divergence_different() {
let mut w1 = ObservationWindow::new(SubnetId::new(&[1]));
let mut w2 = ObservationWindow::new(SubnetId::new(&[2]));
w1.observe(0xAAAA, 10);
w1.observe(0xBBBB, 5);
w2.observe(0xAAAA, 15);
w2.observe(0xCCCC, 20);
let div = w1.divergence_from(&w2);
assert_eq!(div.common_entities, 1); assert_eq!(div.entities_only_self, 1); assert_eq!(div.entities_only_other, 1); assert_eq!(div.total_seq_difference, 5); }
#[test]
fn observation_window_evicts_oldest_at_cap() {
let mut window = ObservationWindow::new(SubnetId::new(&[1]));
for i in 0..MAX_TRACKED_ENTITIES as u64 {
window.last_observed.insert(i, i);
window.estimated_delay.insert(i, 0);
window.horizon.observe(i, 0);
}
assert_eq!(window.last_observed.len(), MAX_TRACKED_ENTITIES);
window.observe(0, 1);
assert_eq!(window.last_observed.len(), MAX_TRACKED_ENTITIES);
assert!(window.last_observed.contains_key(&0));
let novel = (MAX_TRACKED_ENTITIES + 1000) as u64;
window.observe(novel, 1);
assert_eq!(window.last_observed.len(), MAX_TRACKED_ENTITIES);
assert!(window.last_observed.contains_key(&novel));
assert!(
!window.last_observed.contains_key(&1),
"oldest-by-timestamp entry must have been evicted",
);
assert!(!window.estimated_delay.contains_key(&1));
}
#[test]
fn divergence_seq_diff_saturates_on_overflow() {
let mut w1 = ObservationWindow::new(SubnetId::new(&[1]));
let mut w2 = ObservationWindow::new(SubnetId::new(&[1]));
w1.observe(0xAAAA, u64::MAX);
w2.observe(0xAAAA, 0);
w1.observe(0xBBBB, u64::MAX);
w2.observe(0xBBBB, 0);
let div = w1.divergence_from(&w2);
assert_eq!(div.common_entities, 2);
assert_eq!(
div.total_seq_difference,
u64::MAX,
"two u64::MAX-sized diffs must saturate, not wrap",
);
assert!(
!div.is_converged(),
"saturated divergence must not falsely report converged",
);
}
#[test]
fn test_reachable_entities() {
let mut window = ObservationWindow::new(SubnetId::new(&[1]));
window.observe(0xAAAA, 10);
window.observe(0xBBBB, 20);
let reachable = window.reachable_entities(1_000_000_000);
assert_eq!(reachable.len(), 2);
}
}