vibeio_http/
early_hints.rs1use std::{
2 collections::VecDeque,
3 sync::{Arc, Mutex},
4 task::{Context, Poll, Waker},
5};
6
7use http::{HeaderMap, Request};
8use http_body::Body;
9
10type EarlyHintsResult = Result<(), std::io::Error>;
11pub(super) type EarlyHintsMessage = (
12 HeaderMap,
13 futures_util::lock::Mutex<oneshot::Sender<EarlyHintsResult>>,
14);
15
16#[derive(Clone)]
17pub(super) struct EarlyHints {
18 inner: Arc<Mutex<LazyEarlyHintsState>>,
19}
20
21pub(super) struct EarlyHintsReceiver {
22 inner: Arc<Mutex<LazyEarlyHintsState>>,
23}
24
25#[derive(Default)]
26struct LazyEarlyHintsState {
27 closed: bool,
28 queue: VecDeque<EarlyHintsMessage>,
29 receiver_waker: Option<Waker>,
30}
31
32impl EarlyHints {
33 #[inline]
34 pub(super) fn new_lazy() -> (Self, EarlyHintsReceiver) {
35 let inner = Arc::new(Mutex::new(LazyEarlyHintsState::default()));
36 (
37 Self {
38 inner: inner.clone(),
39 },
40 EarlyHintsReceiver { inner },
41 )
42 }
43
44 #[inline]
45 async fn send(&self, headers: HeaderMap) -> EarlyHintsResult {
46 let (tx, rx) = oneshot::async_channel();
47 let message = (headers, futures_util::lock::Mutex::new(tx));
48 let receiver_waker = {
49 let mut state = self
50 .inner
51 .lock()
52 .unwrap_or_else(|poisoned| poisoned.into_inner());
53 if state.closed {
54 return Err(std::io::Error::other("early hints receiver closed"));
55 }
56 state.queue.push_back(message);
57 state.receiver_waker.take()
58 };
59 if let Some(receiver_waker) = receiver_waker {
60 receiver_waker.wake();
61 }
62 rx.await.map_err(std::io::Error::other)?
63 }
64}
65
66impl EarlyHintsReceiver {
67 #[inline]
68 pub(super) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<EarlyHintsMessage>> {
69 let mut state = self
70 .inner
71 .lock()
72 .unwrap_or_else(|poisoned| poisoned.into_inner());
73 if let Some(message) = state.queue.pop_front() {
74 return Poll::Ready(Some(message));
75 }
76 if state.closed {
77 return Poll::Ready(None);
78 }
79 state.receiver_waker = Some(cx.waker().clone());
80 Poll::Pending
81 }
82 #[inline]
83 pub(super) fn close(&mut self) {
84 let receiver_waker = {
85 let mut state = self
86 .inner
87 .lock()
88 .unwrap_or_else(|poisoned| poisoned.into_inner());
89 if state.closed {
90 None
91 } else {
92 state.closed = true;
93 state.queue.clear();
94 state.receiver_waker.take()
95 }
96 };
97 if let Some(receiver_waker) = receiver_waker {
98 receiver_waker.wake();
99 }
100 }
101}
102
103impl Drop for EarlyHintsReceiver {
104 #[inline]
105 fn drop(&mut self) {
106 self.close();
107 }
108}
109
110#[inline]
147pub async fn send_early_hints(
148 req: &mut Request<impl Body>,
149 headers: HeaderMap,
150) -> Result<(), std::io::Error> {
151 req.extensions()
152 .get::<EarlyHints>()
153 .ok_or_else(|| std::io::Error::other("early hints not supported"))?
154 .send(headers)
155 .await
156}