use crate::{Bandwidth, BwTrace, Duration};
use dyn_clone::DynClone;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use rand_distr::{Distribution, Normal};
const DEFAULT_RNG_SEED: u64 = 42;
#[cfg_attr(feature = "serde", typetag::serde)]
pub trait BwTraceConfig: DynClone + Send {
fn into_model(self: Box<Self>) -> Box<dyn BwTrace>;
}
dyn_clone::clone_trait_object!(BwTraceConfig);
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "truncated-normal")]
use super::solve_truncate::solve;
#[derive(Debug, Clone)]
pub struct StaticBw {
pub bw: Bandwidth,
pub duration: Option<Duration>,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(default))]
#[derive(Debug, Clone, Default)]
pub struct StaticBwConfig {
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub bw: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub duration: Option<Duration>,
}
#[derive(Debug, Clone)]
pub struct NormalizedBw<Rng = StdRng>
where
Rng: RngCore,
{
pub mean: Bandwidth,
pub std_dev: Bandwidth,
pub upper_bound: Option<Bandwidth>,
pub lower_bound: Option<Bandwidth>,
pub duration: Duration,
pub step: Duration,
pub seed: u64,
rng: Rng,
normal: Normal<f64>,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(default))]
#[derive(Debug, Clone, Default)]
pub struct NormalizedBwConfig {
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub mean: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub std_dev: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub upper_bound: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub lower_bound: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub duration: Option<Duration>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub step: Option<Duration>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub seed: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct SawtoothBw<Rng = StdRng>
where
Rng: RngCore,
{
pub bottom: Bandwidth,
pub top: Bandwidth,
pub interval: Duration,
pub duty_ratio: f64,
pub duration: Duration,
pub step: Duration,
pub seed: u64,
pub std_dev: Bandwidth,
pub upper_noise_bound: Option<Bandwidth>,
pub lower_noise_bound: Option<Bandwidth>,
current: Duration,
rng: Rng,
noise: Normal<f64>,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(default))]
#[derive(Debug, Clone, Default)]
pub struct SawtoothBwConfig {
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub bottom: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub top: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub interval: Option<Duration>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub duty_ratio: Option<f64>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub duration: Option<Duration>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "humantime_serde")
)]
pub step: Option<Duration>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub seed: Option<u64>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub std_dev: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub upper_noise_bound: Option<Bandwidth>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "human_bandwidth::serde")
)]
pub lower_noise_bound: Option<Bandwidth>,
}
pub struct RepeatedBwPattern {
pub pattern: Vec<Box<dyn BwTraceConfig>>,
pub count: usize,
current_model: Option<Box<dyn BwTrace>>,
current_cycle: usize,
current_pattern: usize,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(default))]
#[derive(Default, Clone)]
pub struct RepeatedBwPatternConfig {
pub pattern: Vec<Box<dyn BwTraceConfig>>,
pub count: usize,
}
pub struct TraceBw {
pub pattern: Vec<(Duration, Vec<Bandwidth>)>, pub outer_index: usize,
pub inner_index: usize,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(default))]
#[derive(Debug, Clone, Default)]
pub struct TraceBwConfig {
#[cfg_attr(
all(feature = "serde", feature = "human"),
serde(with = "tracebw_serde")
)]
pub pattern: Vec<(Duration, Vec<Bandwidth>)>,
}
impl TraceBwConfig {
pub fn new() -> Self {
Self { pattern: vec![] }
}
pub fn pattern(mut self, pattern: Vec<(Duration, Vec<Bandwidth>)>) -> Self {
self.pattern = pattern;
self
}
pub fn build(self) -> TraceBw {
TraceBw {
pattern: self
.pattern
.into_iter()
.filter(|(_, bandwidths)| !bandwidths.is_empty())
.collect(),
outer_index: 0,
inner_index: 0,
}
}
}
#[cfg(all(feature = "serde", feature = "human"))]
mod tracebw_serde {
use super::*;
use serde::ser::SerializeSeq;
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
use std::ops::{Deref, DerefMut};
pub fn deserialize<'a, T, D>(d: D) -> Result<T, D::Error>
where
Serde<T>: Deserialize<'a>,
D: Deserializer<'a>,
{
Serde::deserialize(d).map(Serde::into_inner)
}
pub fn serialize<T, S>(d: &T, s: S) -> Result<S::Ok, S::Error>
where
for<'a> Serde<&'a T>: Serialize,
S: Serializer,
{
Serde::from(d).serialize(s)
}
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
pub struct Serde<T>(T);
impl<T> fmt::Debug for Serde<T>
where
T: fmt::Debug,
{
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(formatter)
}
}
impl<T> Deref for Serde<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> DerefMut for Serde<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T> Serde<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> From<T> for Serde<T> {
fn from(val: T) -> Serde<T> {
Serde(val)
}
}
impl<'de> Deserialize<'de> for Serde<Vec<(Duration, Vec<Bandwidth>)>> {
fn deserialize<D>(d: D) -> Result<Serde<Vec<(Duration, Vec<Bandwidth>)>>, D::Error>
where
D: Deserializer<'de>,
{
struct V;
impl<'de> de::Visitor<'de> for V {
type Value = Vec<(Duration, Vec<Bandwidth>)>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("a sequence of [str, [str, str, ...]]")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut pattern = Vec::with_capacity(seq.size_hint().unwrap_or(0));
while let Some((duration_str, bandwidths_str)) =
seq.next_element::<(String, Vec<String>)>()?
{
let duration =
humantime_serde::re::humantime::parse_duration(duration_str.as_str())
.map_err(|e| {
serde::de::Error::custom(format!(
"Failed to parse duration '{}': {}",
duration_str, e
))
})?;
let bandwidths = bandwidths_str
.into_iter()
.map(|b| {
human_bandwidth::parse_bandwidth(&b).map_err(|e| {
serde::de::Error::custom(format!(
"Failed to parse bandwidth '{}': {}",
b, e
))
})
})
.collect::<Result<Vec<_>, _>>()?;
pattern.push((duration, bandwidths));
}
Ok(pattern)
}
}
d.deserialize_seq(V).map(Serde)
}
}
impl ser::Serialize for Serde<Vec<(Duration, Vec<Bandwidth>)>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for (duration, bandwidths) in &self.0 {
let time = humantime_serde::re::humantime::format_duration(*duration).to_string();
let bandwidths = bandwidths
.iter()
.map(|item| human_bandwidth::format_bandwidth(*item).to_string())
.collect::<Vec<_>>();
seq.serialize_element(&(time, bandwidths))?;
}
seq.end()
}
}
impl ser::Serialize for Serde<&Vec<(Duration, Vec<Bandwidth>)>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for (duration, bandwidths) in self.0 {
let time = humantime_serde::re::humantime::format_duration(*duration).to_string();
let bandwidths = bandwidths
.iter()
.map(|item| human_bandwidth::format_bandwidth(*item).to_string())
.collect::<Vec<_>>();
seq.serialize_element(&(time, bandwidths))?;
}
seq.end()
}
}
}
impl BwTrace for StaticBw {
fn next_bw(&mut self) -> Option<(Bandwidth, Duration)> {
if let Some(duration) = self.duration.take() {
if duration.is_zero() {
None
} else {
Some((self.bw, duration))
}
} else {
None
}
}
}
impl<Rng: RngCore + Send> BwTrace for NormalizedBw<Rng> {
fn next_bw(&mut self) -> Option<(Bandwidth, Duration)> {
if self.duration.is_zero() {
None
} else {
let bw = self.sample() as u64;
let mut bw = Bandwidth::from_bps(bw);
if let Some(lower_bound) = self.lower_bound {
bw = bw.max(lower_bound);
}
if let Some(upper_bound) = self.upper_bound {
bw = bw.min(upper_bound);
}
let duration = self.step.min(self.duration);
self.duration -= duration;
Some((bw, duration))
}
}
}
impl<Rng: RngCore + Send> BwTrace for SawtoothBw<Rng> {
fn next_bw(&mut self) -> Option<(Bandwidth, Duration)> {
if self.duration.is_zero() {
None
} else {
let current = self.current.as_secs_f64();
let change_point = self.interval.as_secs_f64() * self.duty_ratio;
let base_bw = if current < change_point {
let ratio = current / change_point;
self.bottom + (self.top - self.bottom).mul_f64(ratio)
} else {
let ratio = (current - change_point) / (self.interval.as_secs_f64() - change_point);
self.top - (self.top - self.bottom).mul_f64(ratio)
};
let mut offset = self.noise.sample(&mut self.rng);
if let Some(upper_noise_bound) = self.upper_noise_bound {
offset = offset.min(upper_noise_bound.as_bps() as f64);
}
if let Some(lower_noise_bound) = self.lower_noise_bound {
offset = offset.max(-(lower_noise_bound.as_bps() as f64));
}
let bw = Bandwidth::from_bps((base_bw.as_bps() as f64 + offset) as u64);
let duration = self.step.min(self.duration);
self.duration -= duration;
self.current += duration;
if self.current >= self.interval {
self.current -= self.interval;
}
Some((bw, duration))
}
}
}
impl BwTrace for RepeatedBwPattern {
fn next_bw(&mut self) -> Option<(Bandwidth, Duration)> {
if self.pattern.is_empty() || (self.count != 0 && self.current_cycle >= self.count) {
None
} else {
if self.current_model.is_none() {
self.current_model = Some(self.pattern[self.current_pattern].clone().into_model());
}
match self.current_model.as_mut().unwrap().next_bw() {
Some(bw) => Some(bw),
None => {
self.current_model = None;
self.current_pattern += 1;
if self.current_pattern >= self.pattern.len() {
self.current_pattern = 0;
self.current_cycle += 1;
if self.count != 0 && self.current_cycle >= self.count {
return None;
}
}
self.next_bw()
}
}
}
}
}
impl BwTrace for TraceBw {
fn next_bw(&mut self) -> Option<(Bandwidth, Duration)> {
let result = self
.pattern
.get(self.outer_index)
.and_then(|(duration, bandwidth)| {
bandwidth
.get(self.inner_index)
.map(|bandwidth| (*bandwidth, *duration))
});
if result.is_some() {
if self.pattern[self.outer_index].1.len() > self.inner_index + 1 {
self.inner_index += 1;
} else {
self.outer_index += 1;
self.inner_index = 0;
}
}
result
}
}
impl<Rng: RngCore> NormalizedBw<Rng> {
pub fn sample(&mut self) -> f64 {
self.normal.sample(&mut self.rng)
}
}
impl StaticBwConfig {
pub fn new() -> Self {
Self {
bw: None,
duration: None,
}
}
pub fn bw(mut self, bw: Bandwidth) -> Self {
self.bw = Some(bw);
self
}
pub fn duration(mut self, duration: Duration) -> Self {
self.duration = Some(duration);
self
}
pub fn build(self) -> StaticBw {
StaticBw {
bw: self.bw.unwrap_or_else(|| Bandwidth::from_mbps(12)),
duration: Some(self.duration.unwrap_or_else(|| Duration::from_secs(1))),
}
}
}
macro_rules! saturating_bandwidth_as_bps_u64 {
($bw:expr) => {
$bw.as_gbps()
.saturating_mul(1_000_000_000)
.saturating_add($bw.subgbps_bps() as u64)
};
}
impl NormalizedBwConfig {
pub fn new() -> Self {
Self {
mean: None,
std_dev: None,
upper_bound: None,
lower_bound: None,
duration: None,
step: None,
seed: None,
}
}
pub fn mean(mut self, mean: Bandwidth) -> Self {
self.mean = Some(mean);
self
}
pub fn std_dev(mut self, std_dev: Bandwidth) -> Self {
self.std_dev = Some(std_dev);
self
}
pub fn upper_bound(mut self, upper_bound: Bandwidth) -> Self {
self.upper_bound = Some(upper_bound);
self
}
pub fn lower_bound(mut self, lower_bound: Bandwidth) -> Self {
self.lower_bound = Some(lower_bound);
self
}
pub fn duration(mut self, duration: Duration) -> Self {
self.duration = Some(duration);
self
}
pub fn step(mut self, step: Duration) -> Self {
self.step = Some(step);
self
}
pub fn seed(mut self, seed: u64) -> Self {
self.seed = Some(seed);
self
}
pub fn random_seed(mut self) -> Self {
self.seed = Some(rand::random());
self
}
pub fn build(self) -> NormalizedBw {
self.build_with_rng()
}
pub fn build_with_rng<Rng: SeedableRng + RngCore>(self) -> NormalizedBw<Rng> {
let mean = self.mean.unwrap_or_else(|| Bandwidth::from_mbps(12));
let std_dev = self.std_dev.unwrap_or_else(|| Bandwidth::from_mbps(0));
let upper_bound = self.upper_bound;
let lower_bound = self.lower_bound;
let duration = self.duration.unwrap_or_else(|| Duration::from_secs(1));
let step = self.step.unwrap_or_else(|| Duration::from_millis(1));
let seed = self.seed.unwrap_or(DEFAULT_RNG_SEED);
let rng = Rng::seed_from_u64(seed);
let bw_mean = saturating_bandwidth_as_bps_u64!(mean) as f64;
let bw_std_dev = saturating_bandwidth_as_bps_u64!(std_dev) as f64;
let normal: Normal<f64> = Normal::new(bw_mean, bw_std_dev).unwrap();
NormalizedBw {
mean,
std_dev,
upper_bound,
lower_bound,
duration,
step,
seed,
rng,
normal,
}
}
}
#[cfg(feature = "truncated-normal")]
impl NormalizedBwConfig {
pub fn build_truncated(self) -> NormalizedBw {
self.build_truncated_with_rng()
}
pub fn build_truncated_with_rng<Rng: SeedableRng + RngCore>(mut self) -> NormalizedBw<Rng> {
let mean = self
.mean
.unwrap_or_else(|| Bandwidth::from_mbps(12))
.as_gbps_f64();
let sigma = self
.std_dev
.unwrap_or_else(|| Bandwidth::from_mbps(0))
.as_gbps_f64()
/ mean;
let lower = self
.lower_bound
.unwrap_or_else(|| Bandwidth::from_mbps(0))
.as_gbps_f64()
/ mean;
let upper = self.upper_bound.map(|upper| upper.as_gbps_f64() / mean);
let new_mean = mean * solve(1f64, sigma, Some(lower), upper).unwrap_or(1f64);
self.mean = Some(Bandwidth::from_gbps_f64(new_mean));
self.build_with_rng()
}
}
impl SawtoothBwConfig {
pub fn new() -> Self {
Self {
bottom: None,
top: None,
interval: None,
duty_ratio: None,
duration: None,
step: None,
seed: None,
std_dev: None,
upper_noise_bound: None,
lower_noise_bound: None,
}
}
pub fn bottom(mut self, bottom: Bandwidth) -> Self {
self.bottom = Some(bottom);
self
}
pub fn top(mut self, top: Bandwidth) -> Self {
self.top = Some(top);
self
}
pub fn interval(mut self, interval: Duration) -> Self {
self.interval = Some(interval);
self
}
pub fn duty_ratio(mut self, duty_ratio: f64) -> Self {
self.duty_ratio = Some(duty_ratio);
self
}
pub fn duration(mut self, duration: Duration) -> Self {
self.duration = Some(duration);
self
}
pub fn step(mut self, step: Duration) -> Self {
self.step = Some(step);
self
}
pub fn seed(mut self, seed: u64) -> Self {
self.seed = Some(seed);
self
}
pub fn random_seed(mut self) -> Self {
self.seed = Some(rand::random());
self
}
pub fn std_dev(mut self, std_dev: Bandwidth) -> Self {
self.std_dev = Some(std_dev);
self
}
pub fn upper_noise_bound(mut self, upper_noise_bound: Bandwidth) -> Self {
self.upper_noise_bound = Some(upper_noise_bound);
self
}
pub fn lower_noise_bound(mut self, lower_noise_bound: Bandwidth) -> Self {
self.lower_noise_bound = Some(lower_noise_bound);
self
}
pub fn build(self) -> SawtoothBw {
self.build_with_rng()
}
pub fn build_with_rng<Rng: RngCore + SeedableRng>(self) -> SawtoothBw<Rng> {
let bottom = self.bottom.unwrap_or_else(|| Bandwidth::from_mbps(0));
let top = self.top.unwrap_or_else(|| Bandwidth::from_mbps(12));
if bottom > top {
panic!("SawtoothBw: bottom bw must be less than top bw");
}
let interval = self.interval.unwrap_or_else(|| Duration::from_secs(1));
let duty_ratio = self.duty_ratio.unwrap_or(0.5);
let duration = self.duration.unwrap_or_else(|| Duration::from_secs(1));
let step = self.step.unwrap_or_else(|| Duration::from_millis(1));
let seed = self.seed.unwrap_or(DEFAULT_RNG_SEED);
let rng = Rng::seed_from_u64(seed);
let std_dev = self.std_dev.unwrap_or_else(|| Bandwidth::from_mbps(0));
let upper_noise_bound = self.upper_noise_bound;
let lower_noise_bound = self.lower_noise_bound;
let current = Duration::ZERO;
let bw_std_dev = saturating_bandwidth_as_bps_u64!(std_dev) as f64;
let noise: Normal<f64> = Normal::new(0.0, bw_std_dev).unwrap();
SawtoothBw {
bottom,
top,
interval,
duty_ratio,
duration,
step,
seed,
std_dev,
upper_noise_bound,
lower_noise_bound,
current,
rng,
noise,
}
}
}
impl RepeatedBwPatternConfig {
pub fn new() -> Self {
Self {
pattern: vec![],
count: 0,
}
}
pub fn pattern(mut self, pattern: Vec<Box<dyn BwTraceConfig>>) -> Self {
self.pattern = pattern;
self
}
pub fn count(mut self, count: usize) -> Self {
self.count = count;
self
}
pub fn build(self) -> RepeatedBwPattern {
RepeatedBwPattern {
pattern: self.pattern,
count: self.count,
current_model: None,
current_cycle: 0,
current_pattern: 0,
}
}
}
macro_rules! impl_bw_trace_config {
($name:ident) => {
#[cfg_attr(feature = "serde", typetag::serde)]
impl BwTraceConfig for $name {
fn into_model(self: Box<$name>) -> Box<dyn BwTrace> {
Box::new(self.build())
}
}
};
}
impl_bw_trace_config!(StaticBwConfig);
impl_bw_trace_config!(NormalizedBwConfig);
impl_bw_trace_config!(SawtoothBwConfig);
impl_bw_trace_config!(RepeatedBwPatternConfig);
impl_bw_trace_config!(TraceBwConfig);
pub trait Forever: BwTraceConfig {
fn forever(self) -> RepeatedBwPatternConfig;
}
#[macro_export]
macro_rules! impl_forever {
($name:ident) => {
impl Forever for $name {
fn forever(self) -> RepeatedBwPatternConfig {
RepeatedBwPatternConfig::new()
.pattern(vec![Box::new(self)])
.count(0)
}
}
};
}
impl_forever!(StaticBwConfig);
impl_forever!(NormalizedBwConfig);
impl_forever!(SawtoothBwConfig);
impl_forever!(TraceBwConfig);
impl Forever for RepeatedBwPatternConfig {
fn forever(self) -> RepeatedBwPatternConfig {
self.count(0)
}
}