use crate::{
ospf::OspfProcess,
router::Router,
types::{PhysicalNetwork, Prefix, RouterId},
};
use geoutils::Location;
use itertools::Itertools;
use ordered_float::NotNan;
use priority_queue::PriorityQueue;
use rand::prelude::*;
use rand_distr::Beta;
use serde::{Deserialize, Serialize};
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap, HashSet},
iter::zip,
};
use super::{Event, EventQueue};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(deserialize = "P: for<'a> serde::Deserialize<'a>"))]
#[cfg_attr(docsrs, doc(cfg(feature = "rand_queue")))]
pub struct SimpleTimingModel<P: Prefix> {
q: PriorityQueue<Event<P, NotNan<f64>>, Reverse<NotNan<f64>>>,
messages: HashMap<(RouterId, RouterId), (usize, NotNan<f64>)>,
model: HashMap<(RouterId, RouterId), ModelParams>,
default_params: ModelParams,
current_time: NotNan<f64>,
}
impl<P: Prefix> SimpleTimingModel<P> {
pub fn new(default_params: ModelParams) -> Self {
Self {
q: PriorityQueue::new(),
messages: HashMap::new(),
model: HashMap::new(),
default_params,
current_time: NotNan::default(),
}
}
pub fn set_parameters(&mut self, src: RouterId, dst: RouterId, params: ModelParams) {
self.model.insert((src, dst), params);
}
}
impl<P: Prefix> EventQueue<P> for SimpleTimingModel<P> {
type Priority = NotNan<f64>;
fn push<Ospf: OspfProcess>(
&mut self,
mut event: Event<P, Self::Priority>,
_routers: &BTreeMap<RouterId, Router<P, Ospf>>,
_net: &PhysicalNetwork,
) {
let mut next_time = self.current_time;
let mut rng = thread_rng();
let src = event.source();
let dst = event.router();
let key = (src, dst);
let beta = self.model.get_mut(&key).unwrap_or(&mut self.default_params);
next_time += NotNan::new(beta.sample(&mut rng)).unwrap();
if event.is_bgp_event() {
if let Some((ref mut num, ref mut time)) = self.messages.get_mut(&key) {
if *num > 0 && *time > next_time {
next_time = *time + beta.collision;
}
*num += 1;
*time = next_time;
} else {
self.messages.insert(key, (1, next_time));
}
}
*event.priority_mut() = next_time;
self.q.push(event, Reverse(next_time));
}
fn pop(&mut self) -> Option<Event<P, Self::Priority>> {
let (event, _) = self.q.pop()?;
self.current_time = *event.priority();
match event {
Event::Bgp { src, dst, .. } => {
if let Some((num, _)) = self.messages.get_mut(&(src, dst)) {
*num -= 1;
}
}
Event::Ospf { .. } => {}
}
Some(event)
}
fn peek(&self) -> Option<&Event<P, Self::Priority>> {
self.q.peek().map(|(e, _)| e)
}
fn len(&self) -> usize {
self.q.len()
}
fn is_empty(&self) -> bool {
self.q.is_empty()
}
fn clear(&mut self) {
self.q.clear();
self.messages.clear();
self.current_time = NotNan::default();
}
fn get_time(&self) -> Option<f64> {
Some(self.current_time.into_inner())
}
fn update_params<Ospf: OspfProcess>(
&mut self,
_: &BTreeMap<RouterId, Router<P, Ospf>>,
_: &PhysicalNetwork,
) {
}
unsafe fn clone_events(&self, conquered: Self) -> Self {
SimpleTimingModel {
q: self.q.clone(),
messages: self.messages.clone(),
current_time: self.current_time,
..conquered
}
}
}
impl<P: Prefix> PartialEq for SimpleTimingModel<P> {
fn eq(&self, other: &Self) -> bool {
self.q.iter().collect::<Vec<_>>() == other.q.iter().collect::<Vec<_>>()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(deserialize = "P: for<'a> serde::Deserialize<'a>"))]
#[cfg_attr(docsrs, doc(cfg(feature = "rand_queue")))]
pub struct GeoTimingModel<P: Prefix> {
q: PriorityQueue<Event<P, NotNan<f64>>, Reverse<NotNan<f64>>>,
messages: HashMap<(RouterId, RouterId), (usize, NotNan<f64>)>,
processing_params: HashMap<RouterId, ModelParams>,
default_processing_params: ModelParams,
queuing_params: ModelParams,
paths: HashMap<(RouterId, RouterId), (f64, usize)>,
distances: HashMap<(RouterId, RouterId), NotNan<f64>>,
current_time: NotNan<f64>,
}
const GEO_TIMING_MODEL_DEFAULT_DELAY: f64 = 0.0001;
const GEO_TIMING_MODEL_MAX_DELAY: f64 = 10.0;
const GEO_TIMING_MODEL_F_LIGHT_SPEED: f64 = 1.0 / 299792458.0;
impl<P: Prefix> GeoTimingModel<P> {
pub fn new(
default_processing_params: ModelParams,
queuing_params: ModelParams,
geo_location: &HashMap<RouterId, Location>,
) -> Self {
let distances = geo_location
.iter()
.flat_map(|l1| geo_location.iter().map(move |l2| (l1, l2)))
.map(|((r1, p1), (r2, p2))| {
(
(*r1, *r2),
NotNan::new(
p1.distance_to(p2)
.unwrap_or_else(|_| p1.haversine_distance_to(p2))
.meters(),
)
.unwrap(),
)
})
.collect();
Self {
q: PriorityQueue::new(),
messages: HashMap::new(),
processing_params: HashMap::new(),
default_processing_params,
queuing_params,
paths: HashMap::new(),
distances,
current_time: NotNan::default(),
}
}
pub fn set_parameters(&mut self, router: RouterId, params: ModelParams) {
self.processing_params.insert(router, params);
}
pub fn get_distance(&self, src: RouterId, dst: RouterId) -> Option<f64> {
self.distances.get(&(src, dst)).map(|x| x.into_inner())
}
pub fn set_distance(&mut self, src: RouterId, dst: RouterId, dist: f64) {
let dist = NotNan::new(dist).unwrap();
self.distances.insert((src, dst), dist);
self.distances.insert((dst, src), dist);
}
fn recursive_compute_paths<Ospf: OspfProcess>(
&mut self,
router: RouterId,
target: RouterId,
loop_protection: &mut HashSet<RouterId>,
routers: &BTreeMap<RouterId, Router<P, Ospf>>,
path_cache: &mut HashMap<(RouterId, RouterId), Option<Vec<RouterId>>>,
) {
if router == target {
path_cache.insert((router, target), Some(vec![router]));
self.paths.insert((router, target), (0.0, 0));
return;
}
if !loop_protection.insert(router) {
path_cache.insert((router, target), None);
self.paths
.insert((router, target), (GEO_TIMING_MODEL_MAX_DELAY, 0));
return;
}
let new_path = if let Some(nh) = routers
.get(&router)
.map(|r| r.ospf.get(target))
.and_then(|nhs| nhs.first().copied())
{
if !path_cache.contains_key(&(nh, target)) {
self.recursive_compute_paths(nh, target, loop_protection, routers, path_cache);
}
path_cache.get(&(nh, target)).unwrap().as_ref().map(|path| {
std::iter::once(router)
.chain(path.iter().copied())
.collect_vec()
})
} else {
None
};
if let Some(path) = new_path {
let delay: f64 = zip(&path[0..path.len() - 1], &path[1..path.len()])
.map(|(a, b)| {
self.distances
.get(&(*a, *b))
.map(|x| *x.as_ref())
.unwrap_or(GEO_TIMING_MODEL_DEFAULT_DELAY)
* GEO_TIMING_MODEL_F_LIGHT_SPEED
})
.sum();
self.paths.insert((router, target), (delay, path.len()));
path_cache.insert((router, target), Some(path));
} else {
path_cache.insert((router, target), None);
self.paths
.insert((router, target), (GEO_TIMING_MODEL_MAX_DELAY, 0));
}
loop_protection.remove(&router);
}
pub fn reset_time(&mut self) {
if self.is_empty() {
self.current_time = Default::default();
}
}
#[inline]
fn propagation_time(
&mut self,
source: RouterId,
target: RouterId,
rng: &mut ThreadRng,
) -> NotNan<f64> {
NotNan::new(match self.paths.get(&(source, target)) {
Some((delay, n_hops)) => {
delay + self.queuing_params.sample(rng) * n_hops.saturating_sub(1) as f64
}
None => GEO_TIMING_MODEL_DEFAULT_DELAY,
})
.unwrap()
}
}
impl<P: Prefix> PartialEq for GeoTimingModel<P> {
fn eq(&self, other: &Self) -> bool {
self.q.iter().collect::<Vec<_>>() == other.q.iter().collect::<Vec<_>>()
}
}
impl<P: Prefix> EventQueue<P> for GeoTimingModel<P> {
type Priority = NotNan<f64>;
fn push<Ospf: OspfProcess>(
&mut self,
mut event: Event<P, Self::Priority>,
_: &BTreeMap<RouterId, Router<P, Ospf>>,
_: &PhysicalNetwork,
) {
let mut next_time = self.current_time;
let mut rng = thread_rng();
match event {
Event::Bgp {
p: ref mut t,
src,
dst,
..
} => {
let key = (src, dst);
next_time += self.propagation_time(src, dst, &mut rng);
let beta = self
.processing_params
.get_mut(&src)
.unwrap_or(&mut self.default_processing_params);
next_time += NotNan::new(beta.sample(&mut rng)).unwrap();
if let Some((ref mut num, ref mut time)) = self.messages.get_mut(&key) {
if *num > 0 && *time > next_time {
next_time = *time + beta.collision;
}
*num += 1;
*time = next_time;
} else {
self.messages.insert(key, (1, next_time));
}
*t = next_time;
}
Event::Ospf {
p: ref mut t,
src,
dst,
..
} => {
next_time += self.propagation_time(src, dst, &mut rng);
let beta = self
.processing_params
.get_mut(&src)
.unwrap_or(&mut self.default_processing_params);
next_time += NotNan::new(beta.sample(&mut rng)).unwrap();
*t = next_time;
}
}
self.q.push(event, Reverse(next_time));
}
fn pop(&mut self) -> Option<Event<P, Self::Priority>> {
let (event, _) = self.q.pop()?;
self.current_time = *event.priority();
match event {
Event::Bgp { src, dst, .. } => {
if let Some((num, _)) = self.messages.get_mut(&(src, dst)) {
*num -= 1;
}
}
Event::Ospf { .. } => {}
}
Some(event)
}
fn peek(&self) -> Option<&Event<P, Self::Priority>> {
self.q.peek().map(|(e, _)| e)
}
fn len(&self) -> usize {
self.q.len()
}
fn is_empty(&self) -> bool {
self.q.is_empty()
}
fn clear(&mut self) {
self.q.clear();
self.messages.clear();
self.current_time = NotNan::default();
}
fn get_time(&self) -> Option<f64> {
Some(self.current_time.into_inner())
}
fn update_params<Ospf: OspfProcess>(
&mut self,
routers: &BTreeMap<RouterId, Router<P, Ospf>>,
_: &PhysicalNetwork,
) {
self.paths.clear();
for src in routers.keys() {
for dst in routers.keys() {
self.recursive_compute_paths(
*src,
*dst,
&mut HashSet::new(),
routers,
&mut HashMap::new(),
);
}
}
}
unsafe fn clone_events(&self, conquered: Self) -> Self {
GeoTimingModel {
q: self.q.clone(),
messages: self.messages.clone(),
current_time: self.current_time,
..conquered
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(docsrs, doc(cfg(feature = "rand_queue")))]
pub struct ModelParams {
pub offset: f64,
pub scale: f64,
pub alpha: f64,
pub beta: f64,
pub collision: NotNan<f64>,
#[serde(skip)]
dist: Option<Beta<f64>>,
}
impl PartialEq for ModelParams {
fn eq(&self, other: &Self) -> bool {
self.offset == other.offset
&& self.scale == other.scale
&& self.alpha == other.alpha
&& self.beta == other.beta
&& self.collision == other.collision
}
}
impl ModelParams {
pub fn new(offset: f64, scale: f64, alpha: f64, beta: f64, collision: f64) -> Self {
Self {
offset,
scale,
alpha,
beta,
collision: NotNan::new(collision).unwrap(),
dist: Some(Beta::new(alpha, beta).unwrap()),
}
}
pub fn sample<R: Rng + ?Sized>(&mut self, rng: &mut R) -> f64 {
if self.dist.is_none() {
self.dist = Some(Beta::new(self.alpha, self.beta).unwrap());
}
(self.dist.map(|s| s.sample(rng)).unwrap() * self.scale) + self.offset
}
}