use super::H2Connection;
use std::{
future::Future,
io,
pin::Pin,
task::{Context, Poll, Waker},
time::{Duration, Instant},
};
#[derive(Debug)]
pub(crate) struct PendingPing {
pub(crate) sent_at: Instant,
pub(crate) waker: Option<Waker>,
pub(crate) completed: Option<io::Result<Duration>>,
}
#[must_use = "futures do nothing unless awaited"]
#[derive(Debug)]
pub struct SendPing<'a> {
pub(super) connection: &'a H2Connection,
pub(super) opaque: [u8; 8],
pub(super) needs_cleanup: bool,
}
impl Future for SendPing<'_> {
type Output = io::Result<Duration>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if !this.needs_cleanup {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"PING with this opaque payload is already in flight",
)));
}
let mut pending = this
.connection
.pending_pings
.lock()
.expect("pending_pings mutex poisoned");
let entry = pending
.get_mut(&this.opaque)
.expect("pending_pings entry removed while SendPing future still pending");
if let Some(result) = entry.completed.take() {
pending.remove(&this.opaque);
this.needs_cleanup = false;
return Poll::Ready(result);
}
entry.waker = Some(cx.waker().clone());
Poll::Pending
}
}
impl Drop for SendPing<'_> {
fn drop(&mut self) {
if self.needs_cleanup
&& let Ok(mut pending) = self.connection.pending_pings.lock()
{
pending.remove(&self.opaque);
}
}
}
impl H2Connection {
pub fn send_ping(&self, opaque: [u8; 8]) -> SendPing<'_> {
let mut pending = self
.pending_pings
.lock()
.expect("pending_pings mutex poisoned");
if pending.contains_key(&opaque) {
return SendPing {
connection: self,
opaque,
needs_cleanup: false,
};
}
pending.insert(
opaque,
PendingPing {
sent_at: Instant::now(),
waker: None,
completed: None,
},
);
drop(pending);
self.pending_ping_outbound
.lock()
.expect("pending_ping_outbound mutex poisoned")
.push_back(opaque);
self.outbound_waker.wake();
SendPing {
connection: self,
opaque,
needs_cleanup: true,
}
}
pub(in crate::h2) fn drain_pending_ping_outbound(&self) -> Vec<[u8; 8]> {
let mut queue = self
.pending_ping_outbound
.lock()
.expect("pending_ping_outbound mutex poisoned");
queue.drain(..).collect()
}
pub(in crate::h2) fn complete_pending_ping(&self, opaque: [u8; 8]) {
let mut pending = self
.pending_pings
.lock()
.expect("pending_pings mutex poisoned");
if let Some(entry) = pending.get_mut(&opaque) {
let elapsed = entry.sent_at.elapsed();
entry.completed = Some(Ok(elapsed));
if let Some(waker) = entry.waker.take() {
waker.wake();
}
}
}
pub(in crate::h2) fn fail_pending_pings(
&self,
error_kind: io::ErrorKind,
message: &'static str,
) {
let mut pending = self
.pending_pings
.lock()
.expect("pending_pings mutex poisoned");
for entry in pending.values_mut() {
if entry.completed.is_none() {
entry.completed = Some(Err(io::Error::new(error_kind, message)));
if let Some(waker) = entry.waker.take() {
waker.wake();
}
}
}
}
}