#![forbid(unsafe_code)]
#![warn(
missing_docs,
rust_2018_idioms,
trivial_casts,
unused_lifetimes,
unused_qualifications,
missing_copy_implementations,
missing_debug_implementations,
clippy::cognitive_complexity,
clippy::missing_const_for_fn,
clippy::needless_borrow
)]
use std::iter::FromIterator;
use std::time::Duration;
use approx::relative_eq;
use rmpfit::{MPFitter, MPResult};
#[derive(Debug, Copy, Clone)]
pub struct Measurement {
pub n: f64,
pub x: f64,
pub r: f64,
}
impl Measurement {
pub fn concurrency_and_latency(n: u32, r: Duration) -> Measurement {
let n = n.into();
let r = r.as_secs_f64();
Measurement { n, x: n / r, r } }
pub fn concurrency_and_throughput(n: u32, x: f64) -> Measurement {
let n = n.into();
Measurement { n, x, r: n / x } }
pub fn throughput_and_latency(x: f64, r: Duration) -> Measurement {
let r = r.as_secs_f64();
Measurement { n: x * r, x, r } }
}
macro_rules! from_tuple {
($a:ty, $b:ty, $f:expr) => {
impl From<($a, $b)> for Measurement {
fn from(v: ($a, $b)) -> Self {
$f(v.0, v.1)
}
}
impl From<($b, $a)> for Measurement {
fn from(v: ($b, $a)) -> Self {
$f(v.1, v.0)
}
}
};
}
from_tuple!(u32, f64, Measurement::concurrency_and_throughput);
from_tuple!(u32, Duration, Measurement::concurrency_and_latency);
from_tuple!(f64, Duration, Measurement::throughput_and_latency);
#[derive(Debug, Copy, Clone)]
pub struct Model {
pub sigma: f64,
pub kappa: f64,
pub lambda: f64,
}
pub const MIN_MEASUREMENTS: usize = 6;
impl Model {
pub fn build(measurements: &[Measurement]) -> Model {
assert!(
measurements.len() >= MIN_MEASUREMENTS,
"must have at least {} measurements",
MIN_MEASUREMENTS
);
let fitter = ModelFitter(measurements.to_vec());
let mut params = fitter.init_params();
if let Err(err) = fitter.mpfit(&mut params, None, &Default::default()) {
panic!("lma error: {}", err)
}
Model { sigma: params[0], kappa: params[1], lambda: params[2] }
}
pub fn throughput_at_concurrency(&self, n: u32) -> f64 {
let n: f64 = n.into();
(self.lambda * n) / (1.0 + (self.sigma * (n - 1.0)) + (self.kappa * n * (n - 1.0)))
}
pub fn latency_at_concurrency(&self, n: u32) -> f64 {
let n: f64 = n.into();
(1.0 + (self.sigma * (n - 1.0)) + (self.kappa * n * (n - 1.0))) / self.lambda
}
pub fn max_concurrency(&self) -> u32 {
(((1.0 - self.sigma) / self.kappa).sqrt()).floor() as u32
}
pub fn max_throughput(&self) -> f64 {
self.throughput_at_concurrency(self.max_concurrency())
}
pub fn latency_at_throughput(&self, x: f64) -> f64 {
(self.sigma - 1.0) / (self.sigma * x - self.lambda)
}
pub fn throughput_at_latency(&self, r: Duration) -> f64 {
let r = r.as_secs_f64();
((self.sigma.powi(2)
+ self.kappa.powi(2)
+ 2.0 * self.kappa * (2.0 * self.lambda * r + self.sigma - 2.0))
.sqrt()
- self.kappa
+ self.sigma)
/ (2.0 * self.kappa * r)
}
pub fn concurrency_at_latency(&self, r: Duration) -> f64 {
let r = r.as_secs_f64();
(self.kappa - self.sigma
+ (self.sigma.powi(2)
+ self.kappa.powi(2)
+ 2.0 * self.kappa * ((2.0 * self.lambda * r) + self.sigma - 2.0))
.sqrt())
/ (2.0 * self.kappa)
}
pub fn concurrency_at_throughput(&self, x: f64) -> f64 {
self.latency_at_throughput(x) * x
}
pub fn is_contention_constrained(&self) -> bool {
self.sigma > self.kappa
}
pub fn is_coherency_constrained(&self) -> bool {
self.sigma < self.kappa
}
pub fn is_limitless(&self) -> bool {
relative_eq!(self.kappa, 0.0)
}
}
impl FromIterator<Measurement> for Model {
fn from_iter<T: IntoIterator<Item = Measurement>>(iter: T) -> Self {
let measurements: Vec<Measurement> = iter.into_iter().collect();
Model::build(&measurements)
}
}
macro_rules! from_iterator {
($a:ty, $b:ty) => {
impl<'a> FromIterator<&'a ($a, $b)> for Model {
fn from_iter<T: IntoIterator<Item = &'a ($a, $b)>>(iter: T) -> Self {
let measurements: Vec<Measurement> = iter.into_iter().map(|&m| m.into()).collect();
Model::build(&measurements)
}
}
};
}
from_iterator!(u32, f64);
from_iterator!(f64, u32);
from_iterator!(Duration, u32);
from_iterator!(u32, Duration);
from_iterator!(f64, Duration);
from_iterator!(Duration, f64);
struct ModelFitter(Vec<Measurement>);
impl ModelFitter {
fn init_params(&self) -> Vec<f64> {
vec![0.1, 0.01, self.0.iter().map(|m| m.x / m.n).fold(f64::NEG_INFINITY, f64::max)]
}
}
impl MPFitter for ModelFitter {
fn eval(&self, params: &[f64], deviates: &mut [f64]) -> MPResult<()> {
let model = Model { sigma: params[0], kappa: params[1], lambda: params[2] };
for (d, m) in deviates.iter_mut().zip(self.0.iter()) {
*d = m.x - model.throughput_at_concurrency(m.n as u32);
}
Ok(())
}
fn number_of_points(&self) -> usize {
self.0.len()
}
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn measurement() {
let m = Measurement::concurrency_and_latency(3, Duration::from_millis(600));
assert_relative_eq!(m.n, 3.0);
assert_relative_eq!(m.r, 0.6);
assert_relative_eq!(m.x, 5.0);
let m = Measurement::concurrency_and_throughput(3, 5.0);
assert_relative_eq!(m.n, 3.0);
assert_relative_eq!(m.r, 0.6);
assert_relative_eq!(m.x, 5.0);
let m = Measurement::throughput_and_latency(5.0, Duration::from_millis(600));
assert_relative_eq!(m.n, 3.0);
assert_relative_eq!(m.r, 0.6);
assert_relative_eq!(m.x, 5.0);
}
#[test]
fn build() {
let model: Model = MEASUREMENTS.iter().collect();
assert_relative_eq!(model.sigma, 0.02671591, max_relative = ACCURACY);
assert_relative_eq!(model.kappa, 7.690945e-4, max_relative = ACCURACY);
assert_relative_eq!(model.lambda, 995.6486, max_relative = ACCURACY);
assert_eq!(model.max_concurrency(), 35);
assert_relative_eq!(model.max_throughput(), 12341.7454, max_relative = ACCURACY);
assert_eq!(model.is_coherency_constrained(), false);
assert_eq!(model.is_contention_constrained(), true);
assert_eq!(model.is_limitless(), false);
assert_relative_eq!(model.latency_at_concurrency(1), 0.0010043702162450092);
assert_relative_eq!(model.latency_at_concurrency(20), 0.0018077244442155811);
assert_relative_eq!(model.latency_at_concurrency(35), 0.002835903510841524);
assert_relative_eq!(model.throughput_at_concurrency(1), 995.648799442353);
assert_relative_eq!(model.throughput_at_concurrency(20), 11063.633101824058);
assert_relative_eq!(model.throughput_at_concurrency(35), 12341.74571391328);
assert_relative_eq!(model.concurrency_at_throughput(955.0), 0.958099855673978);
assert_relative_eq!(model.concurrency_at_throughput(11048.0), 15.35043561102983);
assert_relative_eq!(model.concurrency_at_throughput(12201.0), 17.732208293896793);
assert_relative_eq!(
model.throughput_at_latency(Duration::from_millis(30)),
7047.844027581335
);
assert_relative_eq!(
model.throughput_at_latency(Duration::from_millis(40)),
6056.536321602774
);
assert_relative_eq!(
model.throughput_at_latency(Duration::from_millis(50)),
5387.032125730636
);
assert_relative_eq!(model.latency_at_throughput(7000.0), 0.0012036103337889738);
assert_relative_eq!(model.latency_at_throughput(6000.0), 0.001165116923601453);
assert_relative_eq!(model.latency_at_throughput(5000.0), 0.0011290093731056857);
assert_relative_eq!(
model.concurrency_at_latency(Duration::from_millis(30)),
177.69840792284043
);
assert_relative_eq!(
model.concurrency_at_latency(Duration::from_millis(40)),
208.52453995951137
);
assert_relative_eq!(
model.concurrency_at_latency(Duration::from_millis(50)),
235.61469338193223
);
}
const ACCURACY: f64 = 0.00001;
const MEASUREMENTS: [(u32, f64); 32] = [
(1, 955.16),
(2, 1878.91),
(3, 2688.01),
(4, 3548.68),
(5, 4315.54),
(6, 5130.43),
(7, 5931.37),
(8, 6531.08),
(9, 7219.8),
(10, 7867.61),
(11, 8278.71),
(12, 8646.7),
(13, 9047.84),
(14, 9426.55),
(15, 9645.37),
(16, 9897.24),
(17, 10097.6),
(18, 10240.5),
(19, 10532.39),
(20, 10798.52),
(21, 11151.43),
(22, 11518.63),
(23, 11806.0),
(24, 12089.37),
(25, 12075.41),
(26, 12177.29),
(27, 12211.41),
(28, 12158.93),
(29, 12155.27),
(30, 12118.04),
(31, 12140.4),
(32, 12074.39),
];
}