use std::{
collections::VecDeque,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
};
use http::{HeaderMap, Request};
use http_body::Body;
type EarlyHintsResult = Result<(), std::io::Error>;
pub(super) type EarlyHintsMessage = (
HeaderMap,
futures_util::lock::Mutex<oneshot::Sender<EarlyHintsResult>>,
);
#[derive(Clone)]
pub(super) struct EarlyHints {
inner: Arc<Mutex<LazyEarlyHintsState>>,
}
pub(super) struct EarlyHintsReceiver {
inner: Arc<Mutex<LazyEarlyHintsState>>,
}
#[derive(Default)]
struct LazyEarlyHintsState {
closed: bool,
queue: VecDeque<EarlyHintsMessage>,
receiver_waker: Option<Waker>,
}
impl EarlyHints {
#[inline]
pub(super) fn new_lazy() -> (Self, EarlyHintsReceiver) {
let inner = Arc::new(Mutex::new(LazyEarlyHintsState::default()));
(
Self {
inner: inner.clone(),
},
EarlyHintsReceiver { inner },
)
}
#[inline]
async fn send(&self, headers: HeaderMap) -> EarlyHintsResult {
let (tx, rx) = oneshot::async_channel();
let message = (headers, futures_util::lock::Mutex::new(tx));
let receiver_waker = {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if state.closed {
return Err(std::io::Error::other("early hints receiver closed"));
}
state.queue.push_back(message);
state.receiver_waker.take()
};
if let Some(receiver_waker) = receiver_waker {
receiver_waker.wake();
}
rx.await.map_err(std::io::Error::other)?
}
}
impl EarlyHintsReceiver {
#[inline]
pub(super) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<EarlyHintsMessage>> {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(message) = state.queue.pop_front() {
return Poll::Ready(Some(message));
}
if state.closed {
return Poll::Ready(None);
}
state.receiver_waker = Some(cx.waker().clone());
Poll::Pending
}
#[inline]
pub(super) fn close(&mut self) {
let receiver_waker = {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if state.closed {
None
} else {
state.closed = true;
state.queue.clear();
state.receiver_waker.take()
}
};
if let Some(receiver_waker) = receiver_waker {
receiver_waker.wake();
}
}
}
impl Drop for EarlyHintsReceiver {
#[inline]
fn drop(&mut self) {
self.close();
}
}
#[inline]
pub async fn send_early_hints(
req: &mut Request<impl Body>,
headers: HeaderMap,
) -> Result<(), std::io::Error> {
req.extensions()
.get::<EarlyHints>()
.ok_or_else(|| std::io::Error::other("early hints not supported"))?
.send(headers)
.await
}