#![allow(clippy::items_after_test_module)]
use std::{
fmt,
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
task::{self, Poll},
time::{Duration, Instant},
};
use http2::{Ping, PingPong};
use parking_lot::Mutex;
use crate::client::core::{
self, Error,
rt::{Sleep, Time},
};
type WindowSize = u32;
pub(super) fn disabled() -> Recorder {
Recorder { shared: None }
}
pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
debug_assert!(
config.is_enabled(),
"ping channel requires bdp or keep-alive config",
);
let bdp = config.bdp_initial_window.map(|wnd| Bdp {
bdp: wnd,
max_bandwidth: 0.0,
rtt: 0.0,
ping_delay: Duration::from_millis(100),
stable_count: 0,
});
let now = timer.now();
let bdp_enabled = bdp.is_some();
let keep_alive_enabled = config.keep_alive_interval.is_some();
let next_bdp_at = if bdp_enabled { Some(now) } else { None };
let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
interval,
timeout: config.keep_alive_timeout,
while_idle: config.keep_alive_while_idle,
sleep: timer.sleep(interval),
state: KeepAliveState::Init,
timer: timer.clone(),
});
let shared = Arc::new(Shared {
bytes: AtomicU64::new(0),
last_read_at: AtomicU64::new(0),
is_keep_alive_timed_out: AtomicBool::new(false),
bdp_enabled,
keep_alive_enabled,
start: now,
timer: timer.clone(),
inner: Mutex::new(PingInner {
ping_pong,
ping_sent_at: None,
next_bdp_at,
}),
});
(
Recorder {
shared: Some(shared.clone()),
},
Ponger {
bdp,
keep_alive,
shared,
},
)
}
#[derive(Debug, Clone)]
pub(crate) struct Config {
bdp_initial_window: Option<WindowSize>,
keep_alive_interval: Option<Duration>,
keep_alive_timeout: Duration,
keep_alive_while_idle: bool,
}
#[derive(Clone)]
pub(crate) struct Recorder {
shared: Option<Arc<Shared>>,
}
pub(super) struct Ponger {
bdp: Option<Bdp>,
keep_alive: Option<KeepAlive>,
shared: Arc<Shared>,
}
struct Shared {
bytes: AtomicU64,
last_read_at: AtomicU64,
is_keep_alive_timed_out: AtomicBool,
bdp_enabled: bool,
keep_alive_enabled: bool,
start: Instant,
timer: Time,
inner: Mutex<PingInner>,
}
struct PingInner {
ping_pong: PingPong,
ping_sent_at: Option<Instant>,
next_bdp_at: Option<Instant>,
}
struct Bdp {
bdp: u32,
max_bandwidth: f64,
rtt: f64,
ping_delay: Duration,
stable_count: u32,
}
struct KeepAlive {
interval: Duration,
timeout: Duration,
while_idle: bool,
state: KeepAliveState,
sleep: Pin<Box<dyn Sleep>>,
timer: Time,
}
enum KeepAliveState {
Init,
Scheduled(Instant),
PingSent,
}
pub(super) enum Ponged {
SizeUpdate(WindowSize),
KeepAliveTimedOut,
}
#[derive(Debug)]
pub(super) struct KeepAliveTimedOut;
impl Config {
pub(crate) fn new(
adaptive_window: bool,
initial_window_size: u32,
keep_alive_interval: Option<Duration>,
keep_alive_timeout: Duration,
keep_alive_while_idle: bool,
) -> Self {
Config {
bdp_initial_window: if adaptive_window {
Some(initial_window_size)
} else {
None
},
keep_alive_interval,
keep_alive_timeout,
keep_alive_while_idle,
}
}
pub(super) fn is_enabled(&self) -> bool {
self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
}
}
impl Recorder {
pub(crate) fn record_data(&self, len: usize) {
let shared = if let Some(ref shared) = self.shared {
shared
} else {
return;
};
let now = shared.timer.now();
shared.update_last_read_at(now);
if !shared.bdp_enabled {
return;
}
let mut inner = shared.inner.lock();
if let Some(next_bdp_at) = inner.next_bdp_at {
if now < next_bdp_at {
return;
}
inner.next_bdp_at = None;
}
shared.bytes.fetch_add(len as u64, Ordering::Relaxed);
if !inner.is_ping_sent() {
inner.send_ping(&shared.timer);
}
}
pub(crate) fn record_non_data(&self) {
let shared = if let Some(ref shared) = self.shared {
shared
} else {
return;
};
let now = shared.timer.now();
shared.update_last_read_at(now);
}
pub(super) fn for_stream(self, stream: &http2::RecvStream) -> Self {
if stream.is_end_stream() {
disabled()
} else {
self
}
}
pub(super) fn ensure_not_timed_out(&self) -> core::Result<()> {
if let Some(ref shared) = self.shared
&& shared.is_keep_alive_timed_out.load(Ordering::Acquire)
{
return Err(KeepAliveTimedOut.crate_error());
}
Ok(())
}
}
impl Ponger {
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
let now = self.shared.timer.now();
let is_idle = self.is_idle();
let mut inner = self.shared.inner.lock();
if let Some(ref mut ka) = self.keep_alive {
ka.maybe_schedule(is_idle, &self.shared, &inner);
ka.maybe_ping(cx, is_idle, &self.shared, &mut inner);
}
if !inner.is_ping_sent() {
return Poll::Pending;
}
match inner.ping_pong.poll_pong(cx) {
Poll::Ready(Ok(_pong)) => {
let start = inner
.ping_sent_at
.take()
.expect("pong received implies ping_sent_at");
let rtt = now - start;
trace!("recv pong");
if let Some(ref mut ka) = self.keep_alive {
self.shared.update_last_read_at(now);
ka.maybe_schedule(is_idle, &self.shared, &inner);
ka.maybe_ping(cx, is_idle, &self.shared, &mut inner);
}
if let Some(ref mut bdp) = self.bdp {
let bytes = self.shared.bytes.swap(0, Ordering::AcqRel) as usize;
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
let update = bdp.calculate(bytes, rtt);
inner.next_bdp_at = Some(now + bdp.ping_delay);
if let Some(update) = update {
return Poll::Ready(Ponged::SizeUpdate(update));
}
}
}
Poll::Ready(Err(_e)) => {
debug!("pong error: {}", _e);
}
Poll::Pending => {
if let Some(ref mut ka) = self.keep_alive
&& let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx)
{
self.keep_alive = None;
self.shared
.is_keep_alive_timed_out
.store(true, Ordering::Release);
return Poll::Ready(Ponged::KeepAliveTimedOut);
}
}
}
Poll::Pending
}
fn is_idle(&self) -> bool {
Arc::strong_count(&self.shared) <= 2
}
}
impl Shared {
fn update_last_read_at(&self, now: Instant) {
if self.keep_alive_enabled {
let nanos = now.duration_since(self.start).as_nanos() as u64;
self.last_read_at.store(nanos, Ordering::Release);
}
}
fn last_read_at(&self) -> Instant {
let nanos = self.last_read_at.load(Ordering::Acquire);
self.start + Duration::from_nanos(nanos)
}
}
impl PingInner {
fn send_ping(&mut self, timer: &Time) {
match self.ping_pong.send_ping(Ping::opaque()) {
Ok(()) => {
self.ping_sent_at = Some(timer.now());
trace!("sent ping");
}
Err(_err) => {
debug!("error sending ping: {}", _err);
}
}
}
fn is_ping_sent(&self) -> bool {
self.ping_sent_at.is_some()
}
}
const BDP_LIMIT: usize = 1024 * 1024 * 16;
impl Bdp {
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
if self.bdp as usize == BDP_LIMIT {
self.stabilize_delay();
return None;
}
let rtt = seconds(rtt);
if self.rtt == 0.0 {
self.rtt = rtt;
} else {
self.rtt += (rtt - self.rtt) * 0.125;
}
let bw = (bytes as f64) / (self.rtt * 1.5);
trace!("current bandwidth = {:.1}B/s", bw);
if bw < self.max_bandwidth {
self.stabilize_delay();
return None;
} else {
self.max_bandwidth = bw;
}
if bytes >= self.bdp as usize * 2 / 3 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);
self.stable_count = 0;
self.ping_delay /= 2;
Some(self.bdp)
} else {
self.stabilize_delay();
None
}
}
fn stabilize_delay(&mut self) {
if self.ping_delay < Duration::from_secs(10) {
self.stable_count += 1;
if self.stable_count >= 2 {
self.ping_delay *= 4;
self.stable_count = 0;
}
}
}
}
fn seconds(dur: Duration) -> f64 {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
let secs = dur.as_secs() as f64;
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
}
impl KeepAlive {
fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared, inner: &PingInner) {
match self.state {
KeepAliveState::Init => {
if !self.while_idle && is_idle {
return;
}
self.schedule(shared);
}
KeepAliveState::PingSent => {
if inner.is_ping_sent() {
return;
}
self.schedule(shared);
}
KeepAliveState::Scheduled(..) => (),
}
}
fn schedule(&mut self, shared: &Shared) {
let interval = shared.last_read_at() + self.interval;
self.state = KeepAliveState::Scheduled(interval);
self.timer.reset(&mut self.sleep, interval);
}
fn maybe_ping(
&mut self,
cx: &mut task::Context<'_>,
is_idle: bool,
shared: &Shared,
inner: &mut PingInner,
) {
match self.state {
KeepAliveState::Scheduled(at) => {
if Pin::new(&mut self.sleep).poll(cx).is_pending() {
return;
}
if shared.last_read_at() + self.interval > at {
self.state = KeepAliveState::Init;
cx.waker().wake_by_ref(); return;
}
if !self.while_idle && is_idle {
trace!("keep-alive no need to ping when idle and while_idle=false");
return;
}
trace!("keep-alive interval ({:?}) reached", self.interval);
inner.send_ping(&shared.timer);
self.state = KeepAliveState::PingSent;
let timeout = self.timer.now() + self.timeout;
self.timer.reset(&mut self.sleep, timeout);
}
KeepAliveState::Init | KeepAliveState::PingSent => (),
}
}
fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
match self.state {
KeepAliveState::PingSent => {
if Pin::new(&mut self.sleep).poll(cx).is_pending() {
return Ok(());
}
trace!("keep-alive timeout ({:?}) reached", self.timeout);
Err(KeepAliveTimedOut)
}
KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use tokio::{io::duplex, runtime::Builder as RuntimeBuilder};
use super::*;
use crate::client::core::rt::{ArcTimer, TokioTimer};
async fn build_ping_pong() -> PingPong {
let (client_io, server_io) = duplex(1024);
let client = tokio::spawn(async move { http2::client::handshake(client_io).await });
let server = tokio::spawn(async move { http2::server::handshake(server_io).await });
let (client_res, server_res) = tokio::join!(client, server);
let (_, mut client_conn) = client_res.expect("client join").expect("client handshake");
let _server_conn = server_res.expect("server join").expect("server handshake");
client_conn.ping_pong().expect("ping_pong")
}
#[test]
fn record_data_does_not_block_when_bdp_disabled() {
let rt = RuntimeBuilder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
let (recorder, _ponger) = rt.block_on(async {
let ping_pong = build_ping_pong().await;
let config = Config::new(
false,
0,
Some(Duration::from_secs(60)),
Duration::from_secs(5),
true,
);
let timer = Time::Timer(ArcTimer::new(TokioTimer::new()));
channel(ping_pong, config, timer)
});
let shared = recorder.shared.as_ref().expect("shared").clone();
let (locked_tx, locked_rx) = mpsc::channel();
let (release_tx, release_rx) = mpsc::channel();
let lock_handle = std::thread::spawn(move || {
let _guard = shared.inner.lock();
let _ = locked_tx.send(());
let _ = release_rx.recv();
});
locked_rx.recv().expect("lock acquired");
let recorder_clone = recorder.clone();
let (done_tx, done_rx) = mpsc::channel();
std::thread::spawn(move || {
recorder_clone.record_data(1);
let _ = done_tx.send(());
});
assert!(
done_rx.recv_timeout(Duration::from_millis(50)).is_ok(),
"record_data blocked while shared lock held"
);
let _ = release_tx.send(());
let _ = lock_handle.join();
}
}
impl KeepAliveTimedOut {
pub(super) fn crate_error(self) -> Error {
Error::new(crate::client::core::error::Kind::Http2).with(self)
}
}
impl fmt::Display for KeepAliveTimedOut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("keep-alive timed out")
}
}
impl std::error::Error for KeepAliveTimedOut {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&crate::client::core::error::TimedOut)
}
}