use futures_util::ready;
use h2::{Ping, PingPong};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
#[derive(Clone)]
pub(crate) struct BandwidthMonitor {
inner: Arc<Mutex<Inner>>,
}
type WindowSize = u32;
impl BandwidthMonitor {
pub fn new(pinger: PingPong) -> Self {
BandwidthMonitor {
inner: Arc::new(Mutex::new(Inner {
pinger,
ping_sent: None,
bytes: 0,
bdp: Bdp::new(),
})),
}
}
pub fn append_read_bytes(&self, bytes: usize) {
let mut lock = self.inner.lock().unwrap();
lock.bytes += bytes;
}
pub fn poll_window_update(&mut self, cx: &mut Context<'_>) -> Poll<WindowSize> {
let mut lock = self.inner.lock().unwrap();
if !lock.ping_sent.is_some() {
match lock.pinger.send_ping(Ping::opaque()) {
Ok(_) => {
lock.ping_sent = Some(Instant::now());
}
Err(e) => {
debug!("Error sending ping: {}", e);
}
}
return Poll::Pending;
}
let (bytes, rtt) = match ready!(lock.pinger.poll_pong(cx)) {
Ok(_pong) => {
let rtt = lock.ping_sent.expect("Pong implies ping_sent").elapsed();
lock.ping_sent = None;
let bytes = lock.bytes;
lock.bytes = 0;
trace!("Received BDP pong; bytes = {}, rtt = {:?}", bytes, rtt);
(bytes, rtt)
}
Err(e) => {
debug!("Pong error: {}", e);
return Poll::Pending;
}
};
let window_update = lock.bdp.update(bytes, rtt);
if let Some(window_update) = window_update {
Poll::Ready(window_update)
} else {
Poll::Pending
}
}
}
struct Inner {
pinger: PingPong,
ping_sent: Option<Instant>,
bytes: usize,
bdp: Bdp,
}
const BDP_LIMIT: usize = 1024 * 1024 * 16;
struct Bdp {
bdp: u32,
largest_bandwidth: f64,
rtt: f64,
}
impl Bdp {
fn new() -> Self {
Bdp {
bdp: 0,
largest_bandwidth: 0.0,
rtt: 0.0,
}
}
fn update(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
if self.bdp as usize == BDP_LIMIT {
return None;
}
let rtt = rtt.as_secs_f64();
if self.rtt == 0.0 {
self.rtt = rtt;
} else {
self.rtt += (rtt - self.rtt) * 0.125;
}
let bw = (bytes as f64) / (self.rtt * 1.5);
trace!("Current bandwidth = {:.1}B/s", bw);
if bw <= self.largest_bandwidth {
return None;
} else {
self.largest_bandwidth = bw;
}
if bytes >= self.bdp as usize * 2 / 3 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);
Some(self.bdp)
} else {
None
}
}
}